001: package org.jgroups.protocols.pbcast;
002:
003: import java.io.DataInputStream;
004: import java.io.DataOutputStream;
005: import java.io.IOException;
006: import java.io.ObjectInput;
007: import java.io.ObjectOutput;
008: import java.util.ArrayList;
009: import java.util.Collection;
010: import java.util.HashMap;
011: import java.util.Iterator;
012: import java.util.Map;
013: import java.util.Properties;
014: import java.util.Set;
015: import java.util.TreeSet;
016: import java.util.Vector;
017:
018: import org.jgroups.Address;
019: import org.jgroups.Event;
020: import org.jgroups.Header;
021: import org.jgroups.Message;
022: import org.jgroups.TimeoutException;
023: import org.jgroups.View;
024: import org.jgroups.ViewId;
025: import org.jgroups.stack.Protocol;
026: import org.jgroups.util.Promise;
027: import org.jgroups.util.Streamable;
028: import org.jgroups.util.Util;
029:
030: import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
031:
032: /**
033: * Flush, as it name implies, forces group members to flush their pending messages
034: * while blocking them to send any additional messages. The process of flushing
035: * acquiesces the group so that state transfer or a join can be done. It is also
036: * called stop-the-world model as nobody will be able to send messages while a
037: * flush is in process.
038: *
039: * <p>
040: * Flush is needed for:
041: * <p>
042: * (1) State transfer. When a member requests state transfer, the coordinator
043: * tells everyone to stop sending messages and waits for everyone's ack. Then it asks
044: * the application for its state and ships it back to the requester. After the
045: * requester has received and set the state successfully, the coordinator tells
046: * everyone to resume sending messages.
047: * <p>
048: * (2) View changes (e.g.a join). Before installing a new view V2, flushing would
049: * ensure that all messages *sent* in the current view V1 are indeed *delivered*
050: * in V1, rather than in V2 (in all non-faulty members). This is essentially
051: * Virtual Synchrony.
052: *
053: *
054: *
055: * @author Vladimir Blagojevic
056: * @version $Id$
057: * @since 2.4
058: */
059: public class FLUSH extends Protocol {
060: public static final String NAME = "FLUSH";
061:
062: // GuardedBy ("sharedLock")
063: private View currentView;
064:
065: // GuardedBy ("sharedLock")
066: private Address localAddress;
067:
068: /**
069: * Group member that requested FLUSH.
070: * For view intallations flush coordinator is the group coordinator
071: * For state transfer flush coordinator is the state requesting member
072: */
073: // GuardedBy ("sharedLock")
074: private Address flushCoordinator;
075:
076: // GuardedBy ("sharedLock")
077: private final Collection flushMembers;
078:
079: // GuardedBy ("sharedLock")
080: private final Set flushOkSet;
081:
082: // GuardedBy ("sharedLock")
083: private final Set flushCompletedSet;
084:
085: // GuardedBy ("sharedLock")
086: private final Set stopFlushOkSet;
087:
088: // GuardedBy ("sharedLock")
089: private final Set suspected;
090:
091: private final Object sharedLock = new Object();
092:
093: private final Object blockMutex = new Object();
094:
095: /**
096: * Indicates if FLUSH.down() is currently blocking threads
097: * Condition predicate associated with blockMutex
098: */
099: //GuardedBy ("blockMutex")
100: private boolean isBlockingFlushDown = true;
101:
102: /**
103: * Default timeout for a group member to be in <code>isBlockingFlushDown</code>
104: */
105: private long timeout = 8000;
106:
107: /**
108: * Default timeout started when <code>Event.BLOCK</code> is passed to
109: * application. Response <code>Event.BLOCK_OK</code> should be received by
110: * application within timeout.
111: */
112: private long block_timeout = 10000;
113:
114: // GuardedBy ("sharedLock")
115: private boolean receivedFirstView = false;
116:
117: // GuardedBy ("sharedLock")
118: private boolean receivedMoreThanOneView = false;
119:
120: private long startFlushTime;
121:
122: private long totalTimeInFlush;
123:
124: private int numberOfFlushes;
125:
126: private double averageFlushDuration;
127:
128: private final Promise flush_promise = new Promise();
129:
130: private final Promise blockok_promise = new Promise();
131:
132: private final FlushPhase flushPhase = new FlushPhase();
133:
134: /**
135: * If true configures timeout in GMS and STATE_TRANFER using FLUSH timeout value
136: */
137: private boolean auto_flush_conf = true;
138:
139: public FLUSH() {
140: super ();
141: currentView = new View(new ViewId(), new Vector());
142: flushOkSet = new TreeSet();
143: flushCompletedSet = new TreeSet();
144: stopFlushOkSet = new TreeSet();
145: flushMembers = new ArrayList();
146: suspected = new TreeSet();
147: }
148:
149: public String getName() {
150: return NAME;
151: }
152:
153: public boolean setProperties(Properties props) {
154: super .setProperties(props);
155: timeout = Util.parseLong(props, "timeout", timeout);
156: block_timeout = Util.parseLong(props, "block_timeout",
157: block_timeout);
158: auto_flush_conf = Util.parseBoolean(props, "auto_flush_conf",
159: auto_flush_conf);
160:
161: if (props.size() > 0) {
162: log.error("the following properties are not recognized: "
163: + props);
164: return false;
165: }
166: return true;
167: }
168:
169: public void init() throws Exception {
170: if (auto_flush_conf) {
171: Map map = new HashMap();
172: map.put("flush_timeout", new Long(timeout));
173: passUp(new Event(Event.CONFIG, map));
174: passDown(new Event(Event.CONFIG, map));
175: }
176: }
177:
178: public void start() throws Exception {
179: Map map = new HashMap();
180: map.put("flush_supported", Boolean.TRUE);
181: passUp(new Event(Event.CONFIG, map));
182: passDown(new Event(Event.CONFIG, map));
183:
184: synchronized (sharedLock) {
185: receivedFirstView = false;
186: receivedMoreThanOneView = false;
187: }
188: synchronized (blockMutex) {
189: isBlockingFlushDown = true;
190: }
191: }
192:
193: public void stop() {
194: synchronized (sharedLock) {
195: currentView = new View(new ViewId(), new Vector());
196: flushCompletedSet.clear();
197: flushOkSet.clear();
198: stopFlushOkSet.clear();
199: flushMembers.clear();
200: suspected.clear();
201: flushCoordinator = null;
202: }
203: }
204:
205: /* -------------------JMX attributes and operations --------------------- */
206:
207: public double getAverageFlushDuration() {
208: return averageFlushDuration;
209: }
210:
211: public long getTotalTimeInFlush() {
212: return totalTimeInFlush;
213: }
214:
215: public int getNumberOfFlushes() {
216: return numberOfFlushes;
217: }
218:
219: public boolean startFlush(long timeout) {
220: boolean successfulFlush = false;
221: down(new Event(Event.SUSPEND));
222: flush_promise.reset();
223: try {
224: flush_promise.getResultWithTimeout(timeout);
225: successfulFlush = true;
226: } catch (TimeoutException e) {
227: }
228: return successfulFlush;
229: }
230:
231: public void stopFlush() {
232: down(new Event(Event.RESUME));
233: }
234:
235: /* ------------------- end JMX attributes and operations --------------------- */
236:
237: public void down(Event evt) {
238: switch (evt.getType()) {
239: case Event.MSG:
240: Message msg = (Message) evt.getArg();
241: FlushHeader fh = (FlushHeader) msg.removeHeader(getName());
242: if (fh != null && fh.type == FlushHeader.FLUSH_BYPASS) {
243: break;
244: } else {
245: blockMessageDuringFlush();
246: break;
247: }
248: case Event.GET_STATE:
249: blockMessageDuringFlush();
250: break;
251:
252: case Event.CONNECT:
253: boolean successfulBlock = sendBlockUpToChannel(block_timeout);
254: if (successfulBlock && log.isDebugEnabled()) {
255: log.debug("Blocking of channel " + localAddress
256: + " completed successfully");
257: }
258:
259: break;
260:
261: case Event.SUSPEND:
262: attemptSuspend(evt);
263: return;
264:
265: case Event.RESUME:
266: onResume();
267: return;
268:
269: case Event.BLOCK_OK:
270: blockok_promise.setResult(Boolean.TRUE);
271: return;
272: }
273: passDown(evt);
274: }
275:
276: private void blockMessageDuringFlush() {
277: boolean shouldSuspendByItself = false;
278: long start = 0, stop = 0;
279: synchronized (blockMutex) {
280: while (isBlockingFlushDown) {
281: if (log.isDebugEnabled())
282: log.debug("FLUSH block at " + localAddress
283: + " for "
284: + (timeout <= 0 ? "ever" : timeout + "ms"));
285: try {
286: start = System.currentTimeMillis();
287: if (timeout <= 0)
288: blockMutex.wait();
289: else
290: blockMutex.wait(timeout);
291: stop = System.currentTimeMillis();
292: } catch (InterruptedException e) {
293: }
294: if (isBlockingFlushDown) {
295: isBlockingFlushDown = false;
296: shouldSuspendByItself = true;
297: blockMutex.notifyAll();
298: }
299: }
300: }
301: if (shouldSuspendByItself) {
302: log.warn("unblocking FLUSH.down() at " + localAddress
303: + " after timeout of " + (stop - start) + "ms");
304: passUp(new Event(Event.SUSPEND_OK));
305: passDown(new Event(Event.SUSPEND_OK));
306: }
307: }
308:
309: public void up(Event evt) {
310:
311: Message msg = null;
312: switch (evt.getType()) {
313: case Event.MSG:
314: msg = (Message) evt.getArg();
315: FlushHeader fh = (FlushHeader) msg.removeHeader(getName());
316: if (fh != null) {
317: flushPhase.lock();
318: if (fh.type == FlushHeader.START_FLUSH) {
319: if (!flushPhase.isFlushInProgress()) {
320: flushPhase.setFirstPhase(true);
321: flushPhase.release();
322: boolean successfulBlock = sendBlockUpToChannel(block_timeout);
323: if (successfulBlock && log.isDebugEnabled()) {
324: log.debug("Blocking of channel "
325: + localAddress
326: + " completed successfully");
327: }
328: onStartFlush(msg.getSrc(), fh);
329: } else if (flushPhase.isInFirstPhase()) {
330: flushPhase.release();
331: Address flushRequester = msg.getSrc();
332: Address coordinator = null;
333: synchronized (sharedLock) {
334: coordinator = flushCoordinator;
335: }
336:
337: if (flushRequester.compareTo(coordinator) < 0) {
338: rejectFlush(fh.viewID, coordinator);
339: if (log.isDebugEnabled()) {
340: log
341: .debug("Rejecting flush at "
342: + localAddress
343: + " to current flush coordinator "
344: + coordinator
345: + " and switching flush coordinator to "
346: + flushRequester);
347: }
348: synchronized (sharedLock) {
349: flushCoordinator = flushRequester;
350: }
351: } else {
352: rejectFlush(fh.viewID, flushRequester);
353: if (log.isDebugEnabled()) {
354: log.debug("Rejecting flush at "
355: + localAddress
356: + " to flush requester "
357: + flushRequester);
358: }
359: }
360: } else if (flushPhase.isInSecondPhase()) {
361: flushPhase.release();
362: Address flushRequester = msg.getSrc();
363: rejectFlush(fh.viewID, flushRequester);
364: if (log.isDebugEnabled()) {
365: log
366: .debug("Rejecting flush in second phase at "
367: + localAddress
368: + " to flush requester "
369: + flushRequester);
370: }
371: }
372: } else if (fh.type == FlushHeader.STOP_FLUSH) {
373: flushPhase.setPhases(false, true);
374: flushPhase.release();
375: onStopFlush();
376: } else if (fh.type == FlushHeader.ABORT_FLUSH) {
377: //abort current flush
378: flushPhase.release();
379: passUp(new Event(Event.SUSPEND_FAILED));
380: passDown(new Event(Event.SUSPEND_FAILED));
381:
382: } else if (isCurrentFlushMessage(fh)) {
383: flushPhase.release();
384: if (fh.type == FlushHeader.FLUSH_OK) {
385: onFlushOk(msg.getSrc(), fh.viewID);
386: } else if (fh.type == FlushHeader.STOP_FLUSH_OK) {
387: onStopFlushOk(msg.getSrc(), fh.viewID);
388: } else if (fh.type == FlushHeader.FLUSH_COMPLETED) {
389: onFlushCompleted(msg.getSrc());
390: }
391: } else {
392: flushPhase.release();
393: if (log.isDebugEnabled())
394: log.debug(localAddress
395: + " received outdated FLUSH message "
396: + fh + ",ignoring it.");
397: }
398: return; //do not pass FLUSH msg up
399: }
400: break;
401:
402: case Event.VIEW_CHANGE:
403: //if this is channel's first view and its the only member of the group then the
404: //goal is to pass BLOCK,VIEW,UNBLOCK to application space on the same thread as VIEW.
405: View newView = (View) evt.getArg();
406: boolean firstView = onViewChange(newView);
407: boolean singletonMember = newView.size() == 1
408: && newView.containsMember(localAddress);
409: if (firstView && singletonMember) {
410: passUp(evt);
411: synchronized (blockMutex) {
412: isBlockingFlushDown = false;
413: blockMutex.notifyAll();
414: }
415: if (log.isDebugEnabled())
416: log
417: .debug("At "
418: + localAddress
419: + " unblocking FLUSH.down() and sending UNBLOCK up");
420:
421: passUp(new Event(Event.UNBLOCK));
422: return;
423: }
424: break;
425:
426: case Event.SET_LOCAL_ADDRESS:
427: synchronized (sharedLock) {
428: localAddress = (Address) evt.getArg();
429: }
430: break;
431:
432: case Event.SUSPECT:
433: onSuspect((Address) evt.getArg());
434: break;
435:
436: case Event.SUSPEND:
437: attemptSuspend(evt);
438: return;
439:
440: case Event.RESUME:
441: onResume();
442: return;
443:
444: }
445:
446: passUp(evt);
447: }
448:
449: public Vector providedDownServices() {
450: Vector retval = new Vector(2);
451: retval.addElement(new Integer(Event.SUSPEND));
452: retval.addElement(new Integer(Event.RESUME));
453: return retval;
454: }
455:
456: private void attemptSuspend(Event evt) {
457: View v = (View) evt.getArg();
458: if (log.isDebugEnabled())
459: log.debug("Received SUSPEND at " + localAddress
460: + ", view is " + v);
461:
462: flushPhase.lock();
463: if (!flushPhase.isFlushInProgress()) {
464: flushPhase.release();
465: onSuspend(v);
466: } else {
467: flushPhase.release();
468: passUp(new Event(Event.SUSPEND_FAILED));
469: passDown(new Event(Event.SUSPEND_FAILED));
470: }
471: }
472:
473: private void rejectFlush(long viewId, Address flushRequester) {
474: Message reject = new Message(flushRequester);
475: reject.putHeader(getName(), new FlushHeader(
476: FlushHeader.ABORT_FLUSH, viewId));
477: passDown(new Event(Event.MSG, reject));
478: }
479:
480: private boolean sendBlockUpToChannel(long btimeout) {
481: boolean successfulBlock = false;
482: blockok_promise.reset();
483:
484: new Thread(Util.getGlobalThreadGroup(), new Runnable() {
485: public void run() {
486: passUp(new Event(Event.BLOCK));
487: }
488: }, "FLUSH block").start();
489:
490: try {
491: blockok_promise.getResultWithTimeout(btimeout);
492: successfulBlock = true;
493: } catch (TimeoutException e) {
494: log
495: .warn("Blocking of channel using BLOCK event timed out after "
496: + btimeout + " msec.");
497: }
498: return successfulBlock;
499: }
500:
501: private boolean isCurrentFlushMessage(FlushHeader fh) {
502: return fh.viewID == currentViewId();
503: }
504:
505: private long currentViewId() {
506: long viewId = -1;
507: synchronized (sharedLock) {
508: ViewId view = currentView.getVid();
509: if (view != null) {
510: viewId = view.getId();
511: }
512: }
513: return viewId;
514: }
515:
516: private boolean onViewChange(View view) {
517: boolean amINewCoordinator = false;
518: boolean isThisOurFirstView = false;
519: synchronized (sharedLock) {
520: if (receivedFirstView) {
521: receivedMoreThanOneView = true;
522: }
523: if (!receivedFirstView) {
524: receivedFirstView = true;
525: }
526: isThisOurFirstView = receivedFirstView
527: && !receivedMoreThanOneView;
528: suspected.retainAll(view.getMembers());
529: currentView = view;
530: amINewCoordinator = flushCoordinator != null
531: && !view.getMembers().contains(flushCoordinator)
532: && localAddress.equals(view.getMembers().get(0));
533: }
534:
535: //If coordinator leaves, its STOP FLUSH message will be discarded by
536: //other members at NAKACK layer. Remaining members will be hung, waiting
537: //for STOP_FLUSH message. If I am new coordinator I will complete the
538: //FLUSH and send STOP_FLUSH on flush callers behalf.
539: if (amINewCoordinator) {
540: if (log.isDebugEnabled())
541: log.debug("Coordinator left, " + localAddress
542: + " will complete flush");
543: onResume();
544: }
545:
546: if (log.isDebugEnabled())
547: log.debug("Installing view at " + localAddress
548: + " view is " + view);
549:
550: return isThisOurFirstView;
551: }
552:
553: private void onStopFlush() {
554: if (stats) {
555: long stopFlushTime = System.currentTimeMillis();
556: totalTimeInFlush += (stopFlushTime - startFlushTime);
557: if (numberOfFlushes > 0) {
558: averageFlushDuration = totalTimeInFlush
559: / (double) numberOfFlushes;
560: }
561: }
562: //ack this STOP_FLUSH
563: Message msg = new Message(null);
564: msg.putHeader(getName(), new FlushHeader(
565: FlushHeader.STOP_FLUSH_OK, currentViewId()));
566: passDown(new Event(Event.MSG, msg));
567:
568: if (log.isDebugEnabled())
569: log
570: .debug("Received STOP_FLUSH and sent STOP_FLUSH_OK from "
571: + localAddress);
572: }
573:
574: private void onSuspend(View view) {
575: Message msg = null;
576: Collection participantsInFlush = null;
577: synchronized (sharedLock) {
578: //start FLUSH only on group members that we need to flush
579: if (view != null) {
580: participantsInFlush = new ArrayList(view.getMembers());
581: participantsInFlush.retainAll(currentView.getMembers());
582: } else {
583: participantsInFlush = new ArrayList(currentView
584: .getMembers());
585: }
586: msg = new Message(null);
587: msg.putHeader(getName(), new FlushHeader(
588: FlushHeader.START_FLUSH, currentViewId(),
589: participantsInFlush));
590: }
591: if (participantsInFlush.isEmpty()) {
592: passUp(new Event(Event.SUSPEND_OK));
593: passDown(new Event(Event.SUSPEND_OK));
594: } else {
595: passDown(new Event(Event.MSG, msg));
596: if (log.isDebugEnabled())
597: log.debug("Received SUSPEND at " + localAddress
598: + ", sent START_FLUSH to "
599: + participantsInFlush);
600: }
601: }
602:
603: private void onResume() {
604: long viewID = currentViewId();
605: Message msg = new Message(null);
606: msg.putHeader(getName(), new FlushHeader(
607: FlushHeader.STOP_FLUSH, viewID));
608: passDown(new Event(Event.MSG, msg));
609: if (log.isDebugEnabled())
610: log.debug("Received RESUME at " + localAddress
611: + ", sent STOP_FLUSH to all");
612: }
613:
614: private void onStartFlush(Address flushStarter, FlushHeader fh) {
615: if (stats) {
616: startFlushTime = System.currentTimeMillis();
617: numberOfFlushes += 1;
618: }
619: synchronized (sharedLock) {
620: flushCoordinator = flushStarter;
621: flushMembers.clear();
622: if (fh.flushParticipants != null) {
623: flushMembers.addAll(fh.flushParticipants);
624: }
625: flushMembers.removeAll(suspected);
626: }
627: Message msg = new Message(null);
628: msg.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_OK,
629: fh.viewID));
630: passDown(new Event(Event.MSG, msg));
631: if (log.isDebugEnabled())
632: log.debug("Received START_FLUSH at " + localAddress
633: + " responded with FLUSH_OK");
634: }
635:
636: private void onFlushOk(Address address, long viewID) {
637:
638: boolean flushOkCompleted = false;
639: Message m = null;
640: synchronized (sharedLock) {
641: flushOkSet.add(address);
642: flushOkCompleted = flushOkSet.containsAll(flushMembers);
643: if (flushOkCompleted) {
644: m = new Message(flushCoordinator);
645: }
646:
647: if (log.isDebugEnabled())
648: log.debug("At " + localAddress + " FLUSH_OK from "
649: + address + ",completed " + flushOkCompleted
650: + ", flushOkSet " + flushOkSet.toString());
651: }
652:
653: if (flushOkCompleted) {
654: synchronized (blockMutex) {
655: isBlockingFlushDown = true;
656: }
657: m.putHeader(getName(), new FlushHeader(
658: FlushHeader.FLUSH_COMPLETED, viewID));
659: passDown(new Event(Event.MSG, m));
660: if (log.isDebugEnabled())
661: log
662: .debug(localAddress
663: + " is blocking FLUSH.down(). Sent FLUSH_COMPLETED message to "
664: + flushCoordinator);
665: }
666: }
667:
668: private void onStopFlushOk(Address address, long viewID) {
669:
670: boolean stopFlushOkCompleted = false;
671: synchronized (sharedLock) {
672: stopFlushOkSet.add(address);
673: TreeSet membersCopy = new TreeSet(currentView.getMembers());
674: membersCopy.removeAll(suspected);
675: stopFlushOkCompleted = stopFlushOkSet
676: .containsAll(membersCopy);
677:
678: if (log.isDebugEnabled())
679: log.debug("At " + localAddress + " STOP_FLUSH_OK from "
680: + address + ",completed "
681: + stopFlushOkCompleted + ", stopFlushOkSet "
682: + stopFlushOkSet.toString());
683: }
684:
685: if (stopFlushOkCompleted) {
686: synchronized (sharedLock) {
687: flushCompletedSet.clear();
688: flushOkSet.clear();
689: stopFlushOkSet.clear();
690: flushMembers.clear();
691: suspected.clear();
692: flushCoordinator = null;
693: }
694: flushPhase.lock();
695: flushPhase.setSecondPhase(false);
696: flushPhase.release();
697:
698: if (log.isDebugEnabled())
699: log
700: .debug("At "
701: + localAddress
702: + " unblocking FLUSH.down() and sending UNBLOCK up");
703:
704: synchronized (blockMutex) {
705: isBlockingFlushDown = false;
706: blockMutex.notifyAll();
707: }
708: passUp(new Event(Event.UNBLOCK));
709: }
710: }
711:
712: private void onFlushCompleted(Address address) {
713: boolean flushCompleted = false;
714: synchronized (sharedLock) {
715: flushCompletedSet.add(address);
716: flushCompleted = flushCompletedSet
717: .containsAll(flushMembers);
718:
719: if (log.isDebugEnabled())
720: log.debug("At " + localAddress
721: + " FLUSH_COMPLETED from " + address
722: + ",completed " + flushCompleted
723: + ",flushCompleted "
724: + flushCompletedSet.toString());
725: }
726:
727: if (flushCompleted) {
728: //needed for jmx operation startFlush(timeout);
729: flush_promise.setResult(Boolean.TRUE);
730: passUp(new Event(Event.SUSPEND_OK));
731: passDown(new Event(Event.SUSPEND_OK));
732: if (log.isDebugEnabled())
733: log.debug("All FLUSH_COMPLETED received at "
734: + localAddress + " sent SUSPEND_OK down/up");
735: }
736: }
737:
738: private void onSuspect(Address address) {
739: boolean flushOkCompleted = false;
740: Message m = null;
741: long viewID = 0;
742: synchronized (sharedLock) {
743: suspected.add(address);
744: flushMembers.removeAll(suspected);
745: viewID = currentViewId();
746: flushOkCompleted = !flushOkSet.isEmpty()
747: && flushOkSet.containsAll(flushMembers);
748: if (flushOkCompleted) {
749: m = new Message(flushCoordinator);
750: }
751:
752: if (log.isDebugEnabled())
753: log.debug("Suspect is " + address + ",completed "
754: + flushOkCompleted + ", flushOkSet "
755: + flushOkSet + " flushMembers " + flushMembers);
756: }
757: if (flushOkCompleted) {
758: m.putHeader(getName(), new FlushHeader(
759: FlushHeader.FLUSH_COMPLETED, viewID));
760: passDown(new Event(Event.MSG, m));
761: if (log.isDebugEnabled())
762: log.debug(localAddress
763: + " sent FLUSH_COMPLETED message to "
764: + flushCoordinator);
765: }
766: }
767:
768: private static class FlushPhase {
769: private boolean inFirstFlushPhase = false;
770: private boolean inSecondFlushPhase = false;
771: private final ReentrantLock lock = new ReentrantLock();
772:
773: public FlushPhase() {
774: }
775:
776: public void lock() {
777: try {
778: lock.acquire();
779: } catch (InterruptedException e) {
780: e.printStackTrace();
781: }
782: }
783:
784: public void release() {
785: lock.release();
786: }
787:
788: public void setFirstPhase(boolean inFirstPhase) {
789: inFirstFlushPhase = inFirstPhase;
790: }
791:
792: public void setSecondPhase(boolean inSecondPhase) {
793: inSecondFlushPhase = inSecondPhase;
794: }
795:
796: public void setPhases(boolean inFirstPhase,
797: boolean inSecondPhase) {
798: inFirstFlushPhase = inFirstPhase;
799: inSecondFlushPhase = inSecondPhase;
800: }
801:
802: public boolean isInFirstPhase() {
803: return inFirstFlushPhase;
804: }
805:
806: public boolean isInSecondPhase() {
807: return inSecondFlushPhase;
808: }
809:
810: public boolean isFlushInProgress() {
811: return inFirstFlushPhase || inSecondFlushPhase;
812: }
813: }
814:
815: public static class FlushHeader extends Header implements
816: Streamable {
817: public static final byte START_FLUSH = 0;
818:
819: public static final byte FLUSH_OK = 1;
820:
821: public static final byte STOP_FLUSH = 2;
822:
823: public static final byte FLUSH_COMPLETED = 3;
824:
825: public static final byte STOP_FLUSH_OK = 4;
826:
827: public static final byte ABORT_FLUSH = 5;
828:
829: public static final byte FLUSH_BYPASS = 6;
830:
831: byte type;
832:
833: long viewID;
834:
835: Collection flushParticipants;
836:
837: public FlushHeader() {
838: this (START_FLUSH, 0);
839: } // used for externalization
840:
841: public FlushHeader(byte type) {
842: this (type, 0);
843: }
844:
845: public FlushHeader(byte type, long viewID) {
846: this (type, viewID, null);
847: }
848:
849: public FlushHeader(byte type, long viewID, Collection flushView) {
850: this .type = type;
851: this .viewID = viewID;
852: this .flushParticipants = flushView;
853: }
854:
855: public String toString() {
856: switch (type) {
857: case START_FLUSH:
858: return "FLUSH[type=START_FLUSH,viewId=" + viewID
859: + ",members=" + flushParticipants + "]";
860: case FLUSH_OK:
861: return "FLUSH[type=FLUSH_OK,viewId=" + viewID + "]";
862: case STOP_FLUSH:
863: return "FLUSH[type=STOP_FLUSH,viewId=" + viewID + "]";
864: case STOP_FLUSH_OK:
865: return "FLUSH[type=STOP_FLUSH_OK,viewId=" + viewID
866: + "]";
867: case ABORT_FLUSH:
868: return "FLUSH[type=ABORT_FLUSH,viewId=" + viewID + "]";
869: case FLUSH_COMPLETED:
870: return "FLUSH[type=FLUSH_COMPLETED,viewId=" + viewID
871: + "]";
872: case FLUSH_BYPASS:
873: return "FLUSH[type=FLUSH_BYPASS,viewId=" + viewID + "]";
874: default:
875: return "[FLUSH: unknown type (" + type + ")]";
876: }
877: }
878:
879: public void writeExternal(ObjectOutput out) throws IOException {
880: out.writeByte(type);
881: out.writeLong(viewID);
882: out.writeObject(flushParticipants);
883: }
884:
885: public void readExternal(ObjectInput in) throws IOException,
886: ClassNotFoundException {
887: type = in.readByte();
888: viewID = in.readLong();
889: flushParticipants = (Collection) in.readObject();
890: }
891:
892: public void writeTo(DataOutputStream out) throws IOException {
893: out.writeByte(type);
894: out.writeLong(viewID);
895: if (flushParticipants != null
896: && !flushParticipants.isEmpty()) {
897: out.writeShort(flushParticipants.size());
898: for (Iterator iter = flushParticipants.iterator(); iter
899: .hasNext();) {
900: Address address = (Address) iter.next();
901: Util.writeAddress(address, out);
902: }
903: } else {
904: out.writeShort(0);
905: }
906: }
907:
908: public void readFrom(DataInputStream in) throws IOException,
909: IllegalAccessException, InstantiationException {
910: type = in.readByte();
911: viewID = in.readLong();
912: int flushParticipantsSize = in.readShort();
913: if (flushParticipantsSize > 0) {
914: flushParticipants = new ArrayList(flushParticipantsSize);
915: for (int i = 0; i < flushParticipantsSize; i++) {
916: flushParticipants.add(Util.readAddress(in));
917: }
918: }
919: }
920: }
921: }
|