001: // $Id: TOTAL.java,v 1.13 2006/01/03 14:11:29 belaban Exp $
002: package org.jgroups.protocols;
003:
004: import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
005: import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
006: import org.jgroups.*;
007: import org.jgroups.stack.AckSenderWindow;
008: import org.jgroups.stack.Protocol;
009: import org.jgroups.util.TimeScheduler;
010: import org.jgroups.util.Streamable;
011:
012: import java.io.*;
013: import java.util.*;
014:
015: /**
016: * Implements the total ordering layer using a message sequencer
017: * <p/>
018: * <p/>
019: * The protocol guarantees that all bcast sent messages will be delivered in
020: * the same order to all members. For that it uses a sequencer which assignes
021: * monotonically increasing sequence ID to broadcasts. Then all group members
022: * deliver the bcasts in ascending sequence ID order.
023: * <p/>
024: * <ul>
025: * <li>
026: * When a bcast message comes down to this layer, it is placed in the pending
027: * down queue. A bcast request is sent to the sequencer.</li>
028: * <li>
029: * When the sequencer receives a bcast request, it creates a bcast reply
030: * message and assigns to it a monotonically increasing seqID and sends it back
031: * to the source of the bcast request.</li>
032: * <li>
033: * When a broadcast reply is received, the corresponding bcast message is
034: * assigned the received seqID. Then it is broadcasted.</li>
035: * <li>
036: * Received bcasts are placed in the up queue. The queue is sorted according
037: * to the seqID of the bcast. Any message at the head of the up queue with a
038: * seqID equal to the next expected seqID is delivered to the layer above.</li>
039: * <li>
040: * Unicast messages coming from the layer below are forwarded above.</li>
041: * <li>
042: * Unicast messages coming from the layer above are forwarded below.</li>
043: * </ul>
044: * <p/>
045: * <i>Please note that once a <code>BLOCK_OK</code> is acknowledged messages
046: * coming from above are discarded!</i> Either the application must stop
047: * sending messages when a <code>BLOCK</code> event is received from the
048: * channel or a QUEUE layer should be placed above this one. Received messages
049: * are still delivered above though.
050: * <p/>
051: * bcast requests are retransmitted periodically until a bcast reply is
052: * received. In case a BCAST_REP is on its way during a BCAST_REQ
053: * retransmission, then the next BCAST_REP will be to a non-existing
054: * BCAST_REQ. So, a null BCAST message is sent to fill the created gap in
055: * the seqID of all members.
056: *
057: * @author i.georgiadis@doc.ic.ac.uk
058: * @author Bela Ban
059: */
060: public class TOTAL extends Protocol {
061: /**
062: * The header processed by the TOTAL layer and intended for TOTAL
063: * inter-stack communication
064: */
065: public static class Header extends org.jgroups.Header implements
066: Streamable {
067: // Header types
068: /**
069: * Null value for the tag
070: */
071: public static final int NULL_TYPE = -1;
072: /**
073: * Request to broadcast by the source
074: */
075: public static final int REQ = 0;
076: /**
077: * Reply to broadcast request.
078: */
079: public static final int REP = 1;
080: /**
081: * Unicast message
082: */
083: public static final int UCAST = 2;
084: /**
085: * Broadcast Message
086: */
087: public static final int BCAST = 3;
088:
089: /**
090: * The header's type tag
091: */
092: public int type;
093: /**
094: * The ID used by the message source to match replies from the
095: * sequencer
096: */
097: public long localSequenceID;
098: /**
099: * The ID imposing the total order of messages
100: */
101: public long sequenceID;
102:
103: /**
104: * used for externalization
105: */
106: public Header() {
107: }
108:
109: /**
110: * Create a header for the TOTAL layer
111: *
112: * @param type the header's type
113: * @param localSeqID the ID used by the sender of broadcasts to match
114: * requests with replies from the sequencer
115: * @param seqID the ID imposing the total order of messages
116: * @throws IllegalArgumentException if the provided header type is
117: * unknown
118: */
119: public Header(int type, long localSeqID, long seqID) {
120: super ();
121: switch (type) {
122: case REQ:
123: case REP:
124: case UCAST:
125: case BCAST:
126: this .type = type;
127: break;
128: default:
129: this .type = NULL_TYPE;
130: throw new IllegalArgumentException("type");
131: }
132: this .localSequenceID = localSeqID;
133: this .sequenceID = seqID;
134: }
135:
136: /**
137: * For debugging purposes
138: */
139: public String toString() {
140: StringBuffer buffer = new StringBuffer();
141: String typeName;
142: buffer.append("[TOTAL.Header");
143: switch (type) {
144: case REQ:
145: typeName = "REQ";
146: break;
147: case REP:
148: typeName = "REP";
149: break;
150: case UCAST:
151: typeName = "UCAST";
152: break;
153: case BCAST:
154: typeName = "BCAST";
155: break;
156: case NULL_TYPE:
157: typeName = "NULL_TYPE";
158: break;
159: default:
160: typeName = "";
161: break;
162: }
163: buffer.append(", type=" + typeName);
164: buffer.append(", " + "localID=" + localSequenceID);
165: buffer.append(", " + "seqID=" + sequenceID);
166: buffer.append(']');
167:
168: return (buffer.toString());
169: }
170:
171: /**
172: * Manual serialization
173: */
174: public void writeExternal(ObjectOutput out) throws IOException {
175: out.writeInt(type);
176: out.writeLong(localSequenceID);
177: out.writeLong(sequenceID);
178: }
179:
180: /**
181: * Manual deserialization
182: */
183: public void readExternal(ObjectInput in) throws IOException,
184: ClassNotFoundException {
185: type = in.readInt();
186: localSequenceID = in.readLong();
187: sequenceID = in.readLong();
188: }
189:
190: public void writeTo(DataOutputStream out) throws IOException {
191: out.writeInt(type);
192: out.writeLong(localSequenceID);
193: out.writeLong(sequenceID);
194: }
195:
196: public void readFrom(DataInputStream in) throws IOException,
197: IllegalAccessException, InstantiationException {
198: type = in.readInt();
199: localSequenceID = in.readLong();
200: sequenceID = in.readLong();
201: }
202:
203: public long size() {
204: return Global.INT_SIZE + Global.LONG_SIZE * 2;
205: }
206:
207: }
208:
209: /**
210: * The retransmission listener - It is called by the
211: * <code>AckSenderWindow</code> when a retransmission should occur
212: */
213: private class Command implements AckSenderWindow.RetransmitCommand {
214: Command() {
215: }
216:
217: public void retransmit(long seqNo, Message msg) {
218: _retransmitBcastRequest(seqNo);
219: }
220: }
221:
222: /**
223: * Protocol name
224: */
225: private static final String PROT_NAME = "TOTAL";
226: /**
227: * Property names
228: */
229: private static final String TRACE_PROP = "trace";
230:
231: /**
232: * Average time between broadcast request retransmissions
233: */
234: private final long[] AVG_RETRANSMIT_INTERVAL = new long[] { 1000,
235: 2000, 3000, 4000 };
236:
237: /**
238: * Null value for the IDs
239: */
240: private static final long NULL_ID = -1;
241: // Layer sending states
242: /**
243: * No group has been joined yet
244: */
245: private static final int NULL_STATE = -1;
246: /**
247: * When set, all messages are sent/received
248: */
249: private static final int RUN = 0;
250: /**
251: * When set, only session-specific messages are sent/received, i.e. only
252: * messages essential to the session's integrity
253: */
254: private static final int FLUSH = 1;
255: /**
256: * No message is sent to the layer below
257: */
258: private static final int BLOCK = 2;
259:
260: /**
261: * The state lock allowing multiple reads or a single write
262: */
263: private final ReadWriteLock stateLock = new WriterPreferenceReadWriteLock();
264: /**
265: * Protocol layer message-sending state
266: */
267: private int state = NULL_STATE;
268: /**
269: * The address of this stack
270: */
271: private Address addr = null;
272: /**
273: * The address of the sequencer
274: */
275: private Address sequencerAddr = null;
276: /**
277: * The sequencer's seq ID. The ID of the most recently broadcast reply
278: * message
279: */
280: private long sequencerSeqID = NULL_ID;
281: /**
282: * The local sequence ID, i.e. the ID sent with the last broadcast request
283: * message. This is increased with every broadcast request sent to the
284: * sequencer and it's used to match the requests with the sequencer's
285: * replies
286: */
287: private long localSeqID = NULL_ID;
288: /**
289: * The total order sequence ID. This is the ID of the most recently
290: * delivered broadcast message. As the sequence IDs are increasing without
291: * gaps, this is used to detect missing broadcast messages
292: */
293: private long seqID = NULL_ID;
294: /**
295: * The list of unanswered broadcast requests to the sequencer. The entries
296: * are stored in increasing local sequence ID, i.e. in the order they were
297: * <p/>
298: * sent localSeqID -> Broadcast msg to be sent.
299: */
300: private SortedMap reqTbl;
301: /**
302: * The list of received broadcast messages that haven't yet been delivered
303: * to the layer above. The entries are stored in increasing sequence ID,
304: * i.e. in the order they must be delivered above
305: * <p/>
306: * seqID -> Received broadcast msg
307: */
308: private SortedMap upTbl;
309: /**
310: * Retranmitter for pending broadcast requests
311: */
312: private AckSenderWindow retransmitter;
313:
314: /**
315: * Print addresses in host_ip:port form to bypass DNS
316: */
317: private String addrToString(Object addr) {
318: return (addr == null ? "<null>"
319: : ((addr instanceof org.jgroups.stack.IpAddress) ? (((org.jgroups.stack.IpAddress) addr)
320: .getIpAddress().getHostAddress() + ':' + ((org.jgroups.stack.IpAddress) addr)
321: .getPort())
322: : addr.toString()));
323: }
324:
325: /**
326: * @return this protocol's name
327: */
328: public String getName() {
329: return PROT_NAME;
330: }
331:
332: /**
333: * Configure the protocol based on the given list of properties
334: *
335: * @param properties the list of properties to use to setup this layer
336: * @return false if there was any unrecognized property or a property with
337: * an invalid value
338: */
339: public boolean setProperties(Properties properties) {
340: String value;
341:
342: // trace
343: // Parse & remove property but ignore it; use Trace.trace instead
344: value = properties.getProperty(TRACE_PROP);
345: if (value != null)
346: properties.remove(TRACE_PROP);
347: if (properties.size() > 0) {
348: if (log.isErrorEnabled())
349: log
350: .error("The following properties are not recognized: "
351: + properties);
352: return (false);
353: }
354: return (true);
355: }
356:
357: /**
358: * Events that some layer below must handle
359: *
360: * @return the set of <code>Event</code>s that must be handled by some layer
361: * below
362: */
363: public Vector requiredDownServices() {
364: return new Vector();
365: }
366:
367: /**
368: * Events that some layer above must handle
369: *
370: * @return the set of <code>Event</code>s that must be handled by some
371: * layer above
372: */
373: public Vector requiredUpServices() {
374: return new Vector();
375: }
376:
377: /**
378: * Extract as many messages as possible from the pending up queue and send
379: * them to the layer above
380: */
381: private void _deliverBcast() {
382: Message msg;
383: Header header;
384:
385: synchronized (upTbl) {
386: while ((msg = (Message) upTbl.remove(new Long(seqID + 1))) != null) {
387: header = (Header) msg.removeHeader(getName());
388: if (header.localSequenceID != NULL_ID)
389: passUp(new Event(Event.MSG, msg));
390: ++seqID;
391: }
392: } // synchronized(upTbl)
393: }
394:
395: /**
396: * Add all undelivered bcasts sent by this member in the req queue and then
397: * replay this queue
398: */
399: private void _replayBcast() {
400: Iterator it;
401: Message msg;
402: Header header;
403:
404: // i. Remove all undelivered bcasts sent by this member and place them
405: // again in the pending bcast req queue
406:
407: synchronized (upTbl) {
408: if (upTbl.size() > 0)
409: if (log.isInfoEnabled())
410: log.info("Replaying undelivered bcasts");
411:
412: it = upTbl.entrySet().iterator();
413: while (it.hasNext()) {
414: msg = (Message) ((Map.Entry) it.next()).getValue();
415: it.remove();
416: if (!msg.getSrc().equals(addr)) {
417: if (log.isInfoEnabled())
418: log
419: .info("During replay: "
420: + "discarding BCAST["
421: + ((TOTAL.Header) msg
422: .getHeader(getName())).sequenceID
423: + "] from "
424: + addrToString(msg.getSrc()));
425: continue;
426: }
427: header = (Header) msg.removeHeader(getName());
428: if (header.localSequenceID == NULL_ID)
429: continue;
430: _sendBcastRequest(msg, header.localSequenceID);
431: }
432: } // synchronized(upTbl)
433: }
434:
435: /**
436: * Send a unicast message: Add a <code>UCAST</code> header
437: *
438: * @param msg the message to unicast
439: * @return the message to send
440: */
441: private Message _sendUcast(Message msg) {
442: msg.putHeader(getName(), new Header(Header.UCAST, NULL_ID,
443: NULL_ID));
444: return (msg);
445: }
446:
447: /**
448: * Replace the original message with a broadcast request sent to the
449: * sequencer. The original bcast message is stored locally until a reply to
450: * bcast is received from the sequencer. This function has the side-effect
451: * of increasing the <code>localSeqID</code>
452: *
453: * @param msg the message to broadcast
454: */
455: private void _sendBcastRequest(Message msg) {
456: _sendBcastRequest(msg, ++localSeqID);
457: }
458:
459: /**
460: * Replace the original message with a broadcast request sent to the
461: * sequencer. The original bcast message is stored locally until a reply
462: * to bcast is received from the sequencer
463: *
464: * @param msg the message to broadcast
465: * @param id the local sequence ID to use
466: */
467: private void _sendBcastRequest(Message msg, long id) {
468:
469: // i. Store away the message while waiting for the sequencer's reply
470: // ii. Send a bcast request immediatelly and also schedule a
471: // retransmission
472: synchronized (reqTbl) {
473: reqTbl.put(new Long(id), msg);
474: }
475: _transmitBcastRequest(id);
476: retransmitter.add(id, msg);
477: }
478:
479: /**
480: * Send the bcast request with the given localSeqID
481: *
482: * @param seqID the local sequence id of the
483: */
484: private void _transmitBcastRequest(long seqID) {
485: Message reqMsg;
486:
487: // i. If NULL_STATE, then ignore, just transient state before
488: // shutting down the retransmission thread
489: // ii. If blocked, be patient - reschedule
490: // iii. If the request is not pending any more, acknowledge it
491: // iv. Create a broadcast request and send it to the sequencer
492:
493: if (state == NULL_STATE) {
494: if (log.isInfoEnabled())
495: log.info("Transmit BCAST_REQ[" + seqID
496: + "] in NULL_STATE");
497: return;
498: }
499: if (state == BLOCK)
500: return;
501:
502: synchronized (reqTbl) {
503: if (!reqTbl.containsKey(new Long(seqID))) {
504: retransmitter.ack(seqID);
505: return;
506: }
507: }
508: reqMsg = new Message(sequencerAddr, addr, new byte[0]);
509: reqMsg.putHeader(getName(), new Header(Header.REQ, seqID,
510: NULL_ID));
511:
512: passDown(new Event(Event.MSG, reqMsg));
513: }
514:
515: /**
516: * Receive a unicast message: Remove the <code>UCAST</code> header
517: *
518: * @param msg the received unicast message
519: */
520: private void _recvUcast(Message msg) {
521: msg.removeHeader(getName());
522: }
523:
524: /**
525: * Receive a broadcast message: Put it in the pending up queue and then
526: * try to deliver above as many messages as possible
527: *
528: * @param msg the received broadcast message
529: */
530: private void _recvBcast(Message msg) {
531: Header header = (Header) msg.getHeader(getName());
532:
533: // i. Put the message in the up pending queue only if it's not
534: // already there, as it seems that the event may be received
535: // multiple times before a view change when all members are
536: // negotiating a common set of stable msgs
537: //
538: // ii. Deliver as many messages as possible
539:
540: synchronized (upTbl) {
541: if (header.sequenceID <= seqID)
542: return;
543: upTbl.put(new Long(header.sequenceID), msg);
544: }
545:
546: _deliverBcast();
547: }
548:
549: /**
550: * Received a bcast request - Ignore if not the sequencer, else send a
551: * bcast reply
552: *
553: * @param msg the broadcast request message
554: */
555: private void _recvBcastRequest(Message msg) {
556: Header header;
557: Message repMsg;
558:
559: // i. If blocked, discard the bcast request
560: // ii. Assign a seqID to the message and send it back to the requestor
561:
562: if (!addr.equals(sequencerAddr)) {
563: if (log.isErrorEnabled())
564: log.error("Received bcast request "
565: + "but not a sequencer");
566: return;
567: }
568: if (state == BLOCK) {
569: if (log.isInfoEnabled())
570: log.info("Blocked, discard bcast req");
571: return;
572: }
573: header = (Header) msg.getHeader(getName());
574: ++sequencerSeqID;
575: repMsg = new Message(msg.getSrc(), addr, new byte[0]);
576: repMsg.putHeader(getName(), new Header(Header.REP,
577: header.localSequenceID, sequencerSeqID));
578:
579: passDown(new Event(Event.MSG, repMsg));
580: }
581:
582: /**
583: * Received a bcast reply - Match with the pending bcast request and move
584: * the message in the list of messages to be delivered above
585: *
586: * @param header the header of the bcast reply
587: */
588: private void _recvBcastReply(Header header) {
589: Message msg;
590: long id;
591:
592: // i. If blocked, discard the bcast reply
593: //
594: // ii. Assign the received seqID to the message and broadcast it
595: //
596: // iii.
597: // - Acknowledge the message to the retransmitter
598: // - If non-existent BCAST_REQ, send a fake bcast to avoid seqID gaps
599: // - If localID == NULL_ID, it's a null BCAST, else normal BCAST
600: // - Set the seq ID of the message to the one sent by the sequencer
601:
602: if (state == BLOCK) {
603: if (log.isInfoEnabled())
604: log.info("Blocked, discard bcast rep");
605: return;
606: }
607:
608: synchronized (reqTbl) {
609: msg = (Message) reqTbl.remove(new Long(
610: header.localSequenceID));
611: }
612:
613: if (msg != null) {
614: retransmitter.ack(header.localSequenceID);
615: id = header.localSequenceID;
616: } else {
617: if (log.isInfoEnabled())
618: log.info("Bcast reply to " + "non-existent BCAST_REQ["
619: + header.localSequenceID
620: + "], Sending NULL bcast");
621: id = NULL_ID;
622: msg = new Message(null, addr, new byte[0]);
623: }
624: msg.putHeader(getName(), new Header(Header.BCAST, id,
625: header.sequenceID));
626:
627: passDown(new Event(Event.MSG, msg));
628: }
629:
630: /**
631: * Resend the bcast request with the given localSeqID
632: *
633: * @param seqID the local sequence id of the
634: */
635: private void _retransmitBcastRequest(long seqID) {
636: // *** Get a shared lock
637: try {
638: stateLock.readLock().acquire();
639: try {
640: if (log.isInfoEnabled())
641: log.info("Retransmit BCAST_REQ[" + seqID + ']');
642: _transmitBcastRequest(seqID);
643: } finally {
644: stateLock.readLock().release();
645: }
646: } catch (InterruptedException e) {
647: log.error("failed acquiring a read lock", e);
648: }
649: }
650:
651: /* Up event handlers
652: * If the return value is true the event travels further up the stack
653: * else it won't be forwarded
654: */
655:
656: /**
657: * Prepare for a VIEW_CHANGE: switch to flushing state
658: *
659: * @return true if the event is to be forwarded further up
660: */
661: private boolean _upBlock() {
662: // *** Get an exclusive lock
663: try {
664: stateLock.writeLock().acquire();
665: try {
666: state = FLUSH;
667: // *** Revoke the exclusive lock
668: } finally {
669: stateLock.writeLock().release();
670: }
671: } catch (InterruptedException e) {
672: log.error("failed acquiring the write lock", e);
673: }
674:
675: return (true);
676: }
677:
678: /**
679: * Handle an up MSG event
680: *
681: * @param event the MSG event
682: * @return true if the event is to be forwarded further up
683: */
684: private boolean _upMsg(Event event) {
685: Message msg;
686: Object obj;
687: Header header;
688:
689: // *** Get a shared lock
690: try {
691: stateLock.readLock().acquire();
692: try {
693:
694: // If NULL_STATE, shouldn't receive any msg on the up queue!
695: if (state == NULL_STATE) {
696: if (log.isErrorEnabled())
697: log.error("Up msg in NULL_STATE");
698: return (false);
699: }
700:
701: // Peek the header:
702: //
703: // (UCAST) A unicast message - Send up the stack
704: // (BCAST) A broadcast message - Handle specially
705: // (REQ) A broadcast request - Handle specially
706: // (REP) A broadcast reply from the sequencer - Handle specially
707: msg = (Message) event.getArg();
708: if (!((obj = msg.getHeader(getName())) instanceof TOTAL.Header)) {
709: if (log.isErrorEnabled())
710: log.error("No TOTAL.Header found");
711: return (false);
712: }
713: header = (Header) obj;
714:
715: switch (header.type) {
716: case Header.UCAST:
717: _recvUcast(msg);
718: return (true);
719: case Header.BCAST:
720: _recvBcast(msg);
721: return (false);
722: case Header.REQ:
723: _recvBcastRequest(msg);
724: return (false);
725: case Header.REP:
726: _recvBcastReply(header);
727: return (false);
728: default:
729: if (log.isErrorEnabled())
730: log.error("Unknown header type");
731: return (false);
732: }
733:
734: // ** Revoke the shared lock
735: } finally {
736: stateLock.readLock().release();
737: }
738: } catch (InterruptedException e) {
739: if (log.isErrorEnabled())
740: log.error(e.getMessage());
741: }
742:
743: return (true);
744: }
745:
746: /**
747: * Set the address of this group member
748: *
749: * @param event the SET_LOCAL_ADDRESS event
750: * @return true if event should be forwarded further up
751: */
752: private boolean _upSetLocalAddress(Event event) {
753: // *** Get an exclusive lock
754: try {
755: stateLock.writeLock().acquire();
756: try {
757: addr = (Address) event.getArg();
758: } finally {
759: stateLock.writeLock().release();
760: }
761: } catch (InterruptedException e) {
762: log.error(e.getMessage());
763: }
764: return (true);
765: }
766:
767: /**
768: * Handle view changes
769: * <p/>
770: * param event the VIEW_CHANGE event
771: *
772: * @return true if the event should be forwarded to the layer above
773: */
774: private boolean _upViewChange(Event event) {
775: Object oldSequencerAddr;
776:
777: // *** Get an exclusive lock
778: try {
779: stateLock.writeLock().acquire();
780: try {
781:
782: state = RUN;
783:
784: // i. See if this member is the sequencer
785: // ii. If this is the sequencer, reset the sequencer's sequence ID
786: // iii. Reset the last received sequence ID
787: //
788: // iv. Replay undelivered bcasts: Put all the undelivered bcasts
789: // sent by us back to the req queue and discard the rest
790: oldSequencerAddr = sequencerAddr;
791: sequencerAddr = (Address) ((View) event.getArg())
792: .getMembers().elementAt(0);
793: if (addr.equals(sequencerAddr)) {
794: sequencerSeqID = NULL_ID;
795: if ((oldSequencerAddr == null)
796: || (!addr.equals(oldSequencerAddr)))
797: if (log.isInfoEnabled())
798: log.info("I'm the new sequencer");
799: }
800: seqID = NULL_ID;
801: _replayBcast();
802:
803: // *** Revoke the exclusive lock
804: } finally {
805: stateLock.writeLock().release();
806: }
807: } catch (InterruptedException e) {
808: log.error(e.getMessage());
809: }
810:
811: return (true);
812: }
813:
814: /*
815: * Down event handlers
816: * If the return value is true the event travels further down the stack
817: * else it won't be forwarded
818: */
819:
820: /**
821: * Blocking confirmed - No messages should come from above until a
822: * VIEW_CHANGE event is received. Switch to blocking state.
823: *
824: * @return true if event should travel further down
825: */
826: private boolean downBlockOk() {
827: // *** Get an exclusive lock
828: try {
829: stateLock.writeLock().acquire();
830: try {
831: state = BLOCK;
832: } finally {
833: stateLock.writeLock().release();
834: }
835: } catch (InterruptedException e) {
836: log.error(e.getMessage());
837: }
838:
839: return (true);
840: }
841:
842: /**
843: * A MSG event travelling down the stack. Forward unicast messages, treat
844: * specially the broadcast messages.<br>
845: * <p/>
846: * If in <code>BLOCK</code> state, i.e. it has replied to a
847: * <code>BLOCk_OK</code> and hasn't yet received a
848: * <code>VIEW_CHANGE</code> event, messages are discarded<br>
849: * <p/>
850: * If in <code>FLUSH</code> state, forward unicast but queue broadcasts
851: *
852: * @param event the MSG event
853: * @return true if event should travel further down
854: */
855: private boolean _downMsg(Event event) {
856: Message msg;
857:
858: // *** Get a shared lock
859: try {
860: stateLock.readLock().acquire();
861: try {
862:
863: // i. Discard all msgs, if in NULL_STATE
864: // ii. Discard all msgs, if blocked
865: if (state == NULL_STATE) {
866: if (log.isErrorEnabled())
867: log.error("Discard msg in NULL_STATE");
868: return (false);
869: }
870: if (state == BLOCK) {
871: if (log.isErrorEnabled())
872: log.error("Blocked, discard msg");
873: return (false);
874: }
875:
876: msg = (Message) event.getArg();
877: if (msg.getDest() == null) {
878: _sendBcastRequest(msg);
879: return (false);
880: } else {
881: msg = _sendUcast(msg);
882: event.setArg(msg);
883: }
884:
885: // ** Revoke the shared lock
886: } finally {
887: stateLock.readLock().release();
888: }
889: } catch (InterruptedException e) {
890: log.error(e.getMessage());
891: }
892:
893: return (true);
894: }
895:
896: /**
897: * Prepare this layer to receive messages from above
898: */
899: public void start() throws Exception {
900: TimeScheduler timer;
901:
902: timer = stack != null ? stack.timer : null;
903: if (timer == null)
904: throw new Exception("TOTAL.start(): timer is null");
905:
906: reqTbl = new TreeMap();
907: upTbl = new TreeMap();
908: retransmitter = new AckSenderWindow(new Command(),
909: AVG_RETRANSMIT_INTERVAL);
910: }
911:
912: /**
913: * Handle the stop() method travelling down the stack.
914: * <p/>
915: * The local addr is set to null, since after a Start->Stop->Start
916: * sequence this member's addr is not guaranteed to be the same
917: */
918: public void stop() {
919: try {
920: stateLock.writeLock().acquire();
921: try {
922: state = NULL_STATE;
923: retransmitter.reset();
924: reqTbl.clear();
925: upTbl.clear();
926: addr = null;
927: } finally {
928: stateLock.writeLock().release();
929: }
930: } catch (InterruptedException e) {
931: log.error(e.getMessage());
932: }
933: }
934:
935: /**
936: * Process an event coming from the layer below
937: *
938: * @param event the event to process
939: */
940: public void up(Event event) {
941: switch (event.getType()) {
942: case Event.BLOCK:
943: if (!_upBlock())
944: return;
945: break;
946: case Event.MSG:
947: if (!_upMsg(event))
948: return;
949: break;
950: case Event.SET_LOCAL_ADDRESS:
951: if (!_upSetLocalAddress(event))
952: return;
953: break;
954: case Event.VIEW_CHANGE:
955: if (!_upViewChange(event))
956: return;
957: break;
958: default:
959: break;
960: }
961:
962: passUp(event);
963: }
964:
965: /**
966: * Process an event coming from the layer above
967: *
968: * @param event the event to process
969: */
970: public void down(Event event) {
971: switch (event.getType()) {
972: case Event.BLOCK_OK:
973: if (!downBlockOk())
974: return;
975: break;
976: case Event.MSG:
977: if (!_downMsg(event))
978: return;
979: break;
980: default:
981: break;
982: }
983:
984: passDown(event);
985: }
986:
987: /**
988: * Create the TOTAL layer
989: */
990: public TOTAL() {
991: }
992:
993: }
|