001: // $Id: QUEUE.java,v 1.8 2005/08/08 12:45:43 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Event;
006: import org.jgroups.Message;
007: import org.jgroups.stack.Protocol;
008: import org.jgroups.util.Util;
009:
010: import java.util.Vector;
011:
012: /**
013: * Queuing layer. Upon reception of event START_QUEUEING, all events traveling through
014: * this layer upwards/downwards (depending on direction of event) will be queued. Upon
015: * reception of a STOP_QUEUEING event, all events will be released. Finally, the
016: * queueing flag is reset.
017: * When queueing, only event STOP_QUEUEING (received up or downwards) will be allowed
018: * to release queueing.
019: * @author Bela Ban
020: */
021:
022: public class QUEUE extends Protocol {
023: final Vector up_vec = new Vector();
024: final Vector dn_vec = new Vector();
025: boolean queueing_up = false, queueing_dn = false;
026: Observer my_observer = null;
027:
028: public interface Observer {
029: /** Called before event is added. Blocks until call returns.
030: @param evt The event
031: @param num_events The number of events in the up vector <em>before</em>
032: this event is added
033: @return boolean True if event should be added. False if it should be discarded */
034: boolean addingToUpVector(Event evt, int num_events);
035:
036: /** Called before event is added. Blocks until call returns.
037: @param evt The event
038: @param num_events The number of events in the down vector <em>before</em>
039: this event is added
040: @return boolean True if event should be added. False if it should be discarded */
041: boolean addingToDownVector(Event evt, int num_events);
042: }
043:
044: /** Only 1 observer is allowed. More than one might slow down the system. Will be called
045: when an event is queued (up or down) */
046: public void setObserver(Observer observer) {
047: this .my_observer = observer;
048: }
049:
050: public Vector getUpVector() {
051: return up_vec;
052: }
053:
054: public Vector getDownVector() {
055: return dn_vec;
056: }
057:
058: public boolean getQueueingUp() {
059: return queueing_up;
060: }
061:
062: public boolean getQueueingDown() {
063: return queueing_dn;
064: }
065:
066: /** All protocol names have to be unique ! */
067: public String getName() {
068: return "QUEUE";
069: }
070:
071: public Vector providedUpServices() {
072: Vector ret = new Vector();
073: ret.addElement(new Integer(Event.START_QUEUEING));
074: ret.addElement(new Integer(Event.STOP_QUEUEING));
075: return ret;
076: }
077:
078: public Vector providedDownServices() {
079: Vector ret = new Vector();
080: ret.addElement(new Integer(Event.START_QUEUEING));
081: ret.addElement(new Integer(Event.STOP_QUEUEING));
082: return ret;
083: }
084:
085: /**
086: Queues or passes up events. No queue sync. necessary, as this method is never called
087: concurrently.
088: */
089: public void up(Event evt) {
090: Message msg;
091: Vector event_list; // to be passed up *before* replaying event queue
092: Event e;
093:
094: switch (evt.getType()) {
095:
096: case Event.START_QUEUEING: // start queueing all up events
097: if (log.isInfoEnabled())
098: log.info("received START_QUEUEING");
099: queueing_up = true;
100: return;
101:
102: case Event.STOP_QUEUEING: // stop queueing all up events
103: event_list = (Vector) evt.getArg();
104: if (event_list != null)
105: for (int i = 0; i < event_list.size(); i++)
106: passUp((Event) event_list.elementAt(i));
107:
108: if (log.isInfoEnabled())
109: log.info("replaying up events");
110:
111: for (int i = 0; i < up_vec.size(); i++) {
112: e = (Event) up_vec.elementAt(i);
113: passUp(e);
114: }
115:
116: up_vec.removeAllElements();
117: queueing_up = false;
118: return;
119: }
120:
121: if (queueing_up) {
122: if (log.isInfoEnabled())
123: log.info("queued up event " + evt);
124: if (my_observer != null) {
125: if (my_observer.addingToUpVector(evt, up_vec.size()) == false)
126: return; // discard event (don't queue)
127: }
128: up_vec.addElement(evt);
129: } else
130: passUp(evt); // Pass up to the layer above us
131: }
132:
133: public void down(Event evt) {
134: Message msg;
135: Vector event_list; // to be passed down *before* replaying event queue
136:
137: switch (evt.getType()) {
138:
139: case Event.START_QUEUEING: // start queueing all down events
140: if (log.isInfoEnabled())
141: log.info("received START_QUEUEING");
142: queueing_dn = true;
143: return;
144:
145: case Event.STOP_QUEUEING: // stop queueing all down events
146: if (log.isInfoEnabled())
147: log.info("received STOP_QUEUEING");
148: event_list = (Vector) evt.getArg();
149: if (event_list != null) // play events first (if available)
150: for (int i = 0; i < event_list.size(); i++)
151: passDown((Event) event_list.elementAt(i));
152:
153: if (log.isInfoEnabled())
154: log.info("replaying down events (" + dn_vec.size()
155: + ')');
156:
157: for (int i = 0; i < dn_vec.size(); i++) {
158: passDown((Event) dn_vec.elementAt(i));
159: }
160:
161: dn_vec.removeAllElements();
162: queueing_dn = false;
163: return;
164: }
165:
166: if (queueing_dn) {
167:
168: if (log.isInfoEnabled())
169: log.info("queued down event: " + Util.printEvent(evt));
170:
171: if (my_observer != null) {
172: if (my_observer.addingToDownVector(evt, dn_vec.size()) == false)
173: return; // discard event (don't queue)
174: }
175: dn_vec.addElement(evt);
176: } else
177: passDown(evt); // Pass up to the layer below us
178: }
179:
180: }
|