001: // $Id: MessageProtocol.java,v 1.6 2006/04/05 05:32:24 belaban Exp $
002:
003: package org.jgroups.stack;
004:
005: import org.jgroups.*;
006: import org.jgroups.blocks.GroupRequest;
007: import org.jgroups.blocks.RequestCorrelator;
008: import org.jgroups.blocks.RequestHandler;
009: import org.jgroups.util.Rsp;
010: import org.jgroups.util.RspList;
011: import org.jgroups.util.Util;
012:
013: import java.util.Vector;
014:
015: /**
016: * Based on Protocol, but incorporates RequestCorrelator and GroupRequest: the latter can
017: * be used to mcast messages to all members and receive their reponses.<p>
018: * A protocol based on this template can send messages to all members and receive all, a single,
019: * n, or none responses. Requests directed towards the protocol can be handled by overriding
020: * method <code>Handle</code>.<p>
021: * Requests and responses are in the form of <code>Message</code>s, which would typically need to
022: * contain information pertaining to the request/response, e.g. in the form of objects contained
023: * in the message. To use remote method calls, use <code>RpcProtocol</code> instead.<p>
024: * Typical use of of a <code>MessageProtocol</code> would be when a protocol needs to interact with
025: * its peer protocols at each of the members' protocol stacks. A simple protocol like fragmentation,
026: * which does not need to interact with other instances of fragmentation, may simply subclass
027: * <code>Protocol</code> instead.
028: * @author Bela Ban
029: */
030: public abstract class MessageProtocol extends Protocol implements
031: RequestHandler {
032: protected RequestCorrelator _corr = null;
033: protected final Vector members = new Vector();
034:
035: public void start() throws Exception {
036: if (_corr == null)
037: _corr = new RequestCorrelator(getName(), this , this );
038: _corr.start();
039: }
040:
041: public void stop() {
042: if (_corr != null) {
043: _corr.stop();
044: // _corr=null;
045: }
046: }
047:
048: /**
049: Cast a message to all members, and wait for <code>mode</code> responses. The responses are
050: returned in a response list, where each response is associated with its sender.<p>
051: Uses <code>GroupRequest</code>.
052: @param dests The members from which responses are expected. If it is null, replies from all members
053: are expected. The request itself is multicast to all members.
054: @param msg The message to be sent to n members
055: @param mode Defined in <code>GroupRequest</code>. The number of responses to wait for:
056: <ol>
057: <li>GET_FIRST: return the first response received.
058: <li>GET_ALL: wait for all responses (minus the ones from suspected members)
059: <li>GET_MAJORITY: wait for a majority of all responses (relative to the grp size)
060: <li>GET_ABS_MAJORITY: wait for majority (absolute, computed once)
061: <li>GET_N: wait for n responses (may block if n > group size)
062: <li>GET_NONE: wait for no responses, return immediately (non-blocking)
063: </ol>
064: @param timeout If 0: wait forever. Otherwise, wait for <code>mode</code> responses
065: <em>or</em> timeout time.
066: @return RspList A list of responses. Each response is an <code>Object</code> and associated
067: to its sender.
068: */
069: public RspList castMessage(Vector dests, Message msg, int mode,
070: long timeout) {
071: GroupRequest _req = null;
072: Vector real_dests = dests != null ? (Vector) dests.clone()
073: : (Vector) members.clone();
074:
075: // This marks message as sent by us ! (used in up()
076: // msg.addHeader(new MsgProtHeader(getName())); ++ already done by RequestCorrelator
077:
078: _req = new GroupRequest(msg, _corr, real_dests, mode, timeout,
079: 0);
080: try {
081: _req.execute();
082: } catch (Exception e) {
083: throw new RuntimeException("failed executing request "
084: + _req, e);
085: }
086:
087: return _req.getResults();
088: }
089:
090: /**
091: Sends a message to a single member (destination = msg.dest) and returns the response.
092: The message's destination must be non-zero !
093: */
094: public Object sendMessage(Message msg, int mode, long timeout)
095: throws TimeoutException, SuspectedException {
096: Vector mbrs = new Vector();
097: RspList rsp_list = null;
098: Object dest = msg.getDest();
099: Rsp rsp;
100: GroupRequest _req = null;
101:
102: if (dest == null) {
103: System.out
104: .println("MessageProtocol.sendMessage(): the message's destination is null ! "
105: + "Cannot send message !");
106: return null;
107: }
108:
109: mbrs.addElement(dest); // dummy membership (of destination address)
110:
111: _req = new GroupRequest(msg, _corr, mbrs, mode, timeout, 0);
112: try {
113: _req.execute();
114: } catch (Exception e) {
115: throw new RuntimeException("failed executing request "
116: + _req, e);
117: }
118:
119: if (mode == GroupRequest.GET_NONE)
120: return null;
121:
122: rsp_list = _req.getResults();
123:
124: if (rsp_list.size() == 0) {
125: if (log.isErrorEnabled())
126: log.error("response list is empty");
127: return null;
128: }
129: if (rsp_list.size() > 1)
130: if (log.isErrorEnabled())
131: log
132: .error("response list contains "
133: + "more that 1 response; returning first response");
134: rsp = (Rsp) rsp_list.elementAt(0);
135: if (rsp.wasSuspected())
136: throw new SuspectedException(dest);
137: if (!rsp.wasReceived())
138: throw new TimeoutException();
139: return rsp.getValue();
140: }
141:
142: /**
143: Processes a request destined for this layer. The return value is sent as response.
144: */
145: public Object handle(Message req) {
146: System.out
147: .println("MessageProtocol.handle(): this method should be overridden !");
148: return null;
149: }
150:
151: /**
152: * Handle an event coming from the layer above
153: */
154: public final void up(Event evt) {
155: Message msg;
156: Object hdr;
157:
158: switch (evt.getType()) {
159: case Event.VIEW_CHANGE:
160: updateView((View) evt.getArg());
161: break;
162: default:
163: if (!handleUpEvent(evt))
164: return;
165:
166: if (evt.getType() == Event.MSG) {
167: msg = (Message) evt.getArg();
168: hdr = msg.getHeader(getName());
169: if (!(hdr instanceof RequestCorrelator.Header))
170: break;
171: }
172: // [[[ TODO
173: // RequestCorrelator.receive() is currently calling passUp()
174: // itself. Only _this_ method should call passUp()!
175: // So return instead of breaking until fixed (igeorg)
176: // ]]] TODO
177: if (_corr != null) {
178: _corr.receive(evt);
179: return;
180: } else if (log.isWarnEnabled())
181: log.warn("Request correlator is null, evt="
182: + Util.printEvent(evt));
183:
184: break;
185: }
186:
187: passUp(evt);
188: }
189:
190: /**
191: * This message is not originated by this layer, therefore we can just
192: * pass it down without having to go through the request correlator.
193: * We do this ONLY for messages !
194: */
195: public final void down(Event evt) {
196: switch (evt.getType()) {
197: case Event.VIEW_CHANGE:
198: updateView((View) evt.getArg());
199: if (!handleDownEvent(evt))
200: return;
201: break;
202: case Event.MSG:
203: if (!handleDownEvent(evt))
204: return;
205: break;
206: default:
207: if (!handleDownEvent(evt))
208: return;
209: break;
210: }
211:
212: passDown(evt);
213: }
214:
215: protected void updateView(View new_view) {
216: Vector new_mbrs = new_view.getMembers();
217: if (new_mbrs != null) {
218: synchronized (members) {
219: members.removeAllElements();
220: members.addAll(new_mbrs);
221: }
222: }
223: }
224:
225: /**
226: Handle up event. Return false if it should not be passed up the stack.
227: */
228: protected boolean handleUpEvent(Event evt) {
229: // override in subclasses
230: return true;
231: }
232:
233: /**
234: Handle down event. Return false if it should not be passed down the stack.
235: */
236: protected boolean handleDownEvent(Event evt) {
237: // override in subclasses
238: return true;
239: }
240:
241: }
|