001: // $Id: STATE_TRANSFER.java,v 1.21.6.1 2007/04/27 08:03:50 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.Message;
008: import org.jgroups.View;
009: import org.jgroups.blocks.GroupRequest;
010: import org.jgroups.blocks.RequestCorrelator;
011: import org.jgroups.blocks.RequestHandler;
012: import org.jgroups.stack.Protocol;
013: import org.jgroups.stack.StateTransferInfo;
014: import org.jgroups.util.Rsp;
015: import org.jgroups.util.RspList;
016: import org.jgroups.util.Util;
017:
018: import java.io.Serializable;
019: import java.util.HashMap;
020: import java.util.Properties;
021: import java.util.Vector;
022:
023: class StateTransferRequest implements Serializable {
024: static final int MAKE_COPY = 1; // arg = originator of request
025: static final int RETURN_STATE = 2; // arg = orginator of request
026:
027: int type = 0;
028: final Object arg;
029: private static final long serialVersionUID = -7734608266762273116L;
030:
031: StateTransferRequest(int type, Object arg) {
032: this .type = type;
033: this .arg = arg;
034: }
035:
036: public int getType() {
037: return type;
038: }
039:
040: public Object getArg() {
041: return arg;
042: }
043:
044: public String toString() {
045: return "[StateTransferRequest: type=" + type2Str(type)
046: + ", arg=" + arg + ']';
047: }
048:
049: static String type2Str(int t) {
050: switch (t) {
051: case MAKE_COPY:
052: return "MAKE_COPY";
053: case RETURN_STATE:
054: return "RETURN_STATE";
055: default:
056: return "<unknown>";
057: }
058: }
059: }
060:
061: /**
062: * State transfer layer. Upon receiving a GET_STATE event from JChannel, a MAKE_COPY message is
063: * sent to all members. When the originator receives MAKE_COPY, it queues all messages to the
064: * channel.
065: * When another member receives the message, it asks the JChannel to provide it with a copy of
066: * the current state (GetStateEvent is received by application, returnState() sends state down the
067: * stack). Then the current layer sends a unicast RETURN_STATE message to the coordinator, which
068: * returns the cached copy.
069: * When the state is received by the originator, the GET_STATE sender is unblocked with a
070: * GET_STATE_OK event up the stack (unless it already timed out).<p>
071: * Requires QUEUE layer on top.
072: *
073: * @author Bela Ban
074: */
075: public class STATE_TRANSFER extends Protocol implements RequestHandler {
076: Address local_addr = null;
077: final Vector members = new Vector(11);
078: final Message m = null;
079: boolean is_server = false;
080: byte[] cached_state = null;
081: final Object state_xfer_mutex = new Object(); // get state from appl (via channel).
082: long timeout_get_appl_state = 5000;
083: long timeout_return_state = 5000;
084: RequestCorrelator corr = null;
085: final Vector observers = new Vector(5);
086: final HashMap map = new HashMap(7);
087:
088: /**
089: * All protocol names have to be unique !
090: */
091: public String getName() {
092: return "STATE_TRANSFER";
093: }
094:
095: public void init() throws Exception {
096: map.put("state_transfer", Boolean.TRUE);
097: map.put("protocol_class", getClass().getName());
098:
099: }
100:
101: public void start() throws Exception {
102: corr = new RequestCorrelator(getName(), this , this );
103: passUp(new Event(Event.CONFIG, map));
104: }
105:
106: public void stop() {
107: if (corr != null) {
108: corr.stop();
109: corr = null;
110: }
111: }
112:
113: public boolean setProperties(Properties props) {
114: String str;
115:
116: super .setProperties(props);
117: // Milliseconds to wait for application to provide requested state, events are
118: // STATE_TRANSFER up and STATE_TRANSFER_OK down
119: str = props.getProperty("timeout_get_appl_state");
120: if (str != null) {
121: timeout_get_appl_state = Long.parseLong(str);
122: props.remove("timeout_get_appl_state");
123: }
124:
125: // Milliseconds to wait for 1 or all members to return its/their state. 0 means wait
126: // forever. States are retrieved using GroupRequest/RequestCorrelator
127: str = props.getProperty("timeout_return_state");
128: if (str != null) {
129: timeout_return_state = Long.parseLong(str);
130: props.remove("timeout_return_state");
131: }
132:
133: if (props.size() > 0) {
134: log
135: .error("STATE_TRANSFER.setProperties(): the following properties are not recognized: "
136: + props);
137:
138: return false;
139: }
140: return true;
141: }
142:
143: public Vector requiredUpServices() {
144: Vector ret = new Vector(2);
145: ret.addElement(new Integer(Event.START_QUEUEING));
146: ret.addElement(new Integer(Event.STOP_QUEUEING));
147: return ret;
148: }
149:
150: public void up(Event evt) {
151: switch (evt.getType()) {
152:
153: case Event.BECOME_SERVER:
154: is_server = true;
155: break;
156:
157: case Event.SET_LOCAL_ADDRESS:
158: local_addr = (Address) evt.getArg();
159: break;
160:
161: case Event.TMP_VIEW:
162: case Event.VIEW_CHANGE:
163: Vector new_members = ((View) evt.getArg()).getMembers();
164: synchronized (members) {
165: members.removeAllElements();
166: if (new_members != null && new_members.size() > 0)
167: for (int k = 0; k < new_members.size(); k++)
168: members.addElement(new_members.elementAt(k));
169: }
170: break;
171: }
172:
173: if (corr != null)
174: corr.receive(evt); // will consume or pass up, depending on header
175: else
176: passUp(evt);
177: }
178:
179: public void down(Event evt) {
180: Object coord, state;
181: Vector event_list;
182: StateTransferInfo info;
183:
184: switch (evt.getType()) {
185:
186: case Event.TMP_VIEW:
187: case Event.VIEW_CHANGE:
188: Vector new_members = ((View) evt.getArg()).getMembers();
189: synchronized (members) {
190: members.removeAllElements();
191: if (new_members != null && new_members.size() > 0)
192: for (int k = 0; k < new_members.size(); k++)
193: members.addElement(new_members.elementAt(k));
194: }
195: break;
196:
197: case Event.GET_STATE: // generated by JChannel.getState()
198: info = (StateTransferInfo) evt.getArg();
199: coord = determineCoordinator();
200:
201: if (coord == null || coord.equals(local_addr)) {
202: event_list = new Vector(1);
203: event_list.addElement(new Event(Event.GET_STATE_OK,
204: new StateTransferInfo()));
205: passUp(new Event(Event.STOP_QUEUEING, event_list));
206: return; // don't pass down any further !
207: }
208:
209: try {
210: sendMakeCopyMessage(); // multicast MAKE_COPY to all members (including me)
211: state = getStateFromSingle(info.target);
212: } catch (Throwable t) {
213: if (log.isErrorEnabled())
214: log.error("failed sending state request", t);
215: state = null;
216: }
217:
218: /* Pass up the state to the application layer (insert into JChannel's event queue */
219: event_list = new Vector(1);
220: event_list.addElement(new Event(Event.GET_STATE_OK,
221: new StateTransferInfo(null, info.state_id, 0L,
222: (byte[]) state)));
223:
224: /* Now stop queueing */
225: passUp(new Event(Event.STOP_QUEUEING, event_list));
226: return; // don't pass down any further !
227:
228: case Event.GET_APPLSTATE_OK:
229: synchronized (state_xfer_mutex) {
230: info = (StateTransferInfo) evt.getArg();
231: cached_state = info.state;
232: state_xfer_mutex.notifyAll();
233: }
234: return; // don't pass down any further !
235:
236: }
237:
238: passDown(evt); // pass on to the layer below us
239: }
240:
241: /* ---------------------- Interface RequestHandler -------------------------- */
242: public Object handle(Message msg) {
243: StateTransferRequest req;
244:
245: try {
246: req = (StateTransferRequest) msg.getObject();
247:
248: switch (req.getType()) {
249: case StateTransferRequest.MAKE_COPY:
250: makeCopy(req.getArg());
251: return null;
252: case StateTransferRequest.RETURN_STATE:
253: if (is_server)
254: return cached_state;
255: else {
256: if (log.isWarnEnabled())
257: log
258: .warn("RETURN_STATE: returning null"
259: + "as I'm not yet an operational state server !");
260: return null;
261: }
262: default:
263: if (log.isErrorEnabled())
264: log.error("type " + req.getType()
265: + "is unknown in StateTransferRequest !");
266: return null;
267: }
268: } catch (Exception e) {
269: if (log.isErrorEnabled())
270: log.error("exception is " + e);
271: return null;
272: }
273: }
274:
275: /* ------------------- End of Interface RequestHandler ---------------------- */
276:
277: byte[] getStateFromSingle(Address target) throws Throwable {
278: Vector dests = new Vector(11);
279: Message msg;
280: StateTransferRequest r = new StateTransferRequest(
281: StateTransferRequest.RETURN_STATE, local_addr);
282: RspList rsp_list;
283: Rsp rsp;
284: Address dest;
285: GroupRequest req;
286: int num_tries = 0;
287:
288: try {
289: msg = new Message(null, null, Util.objectToByteBuffer(r));
290: } catch (Exception e) {
291: if (log.isErrorEnabled())
292: log.error("exception=" + e);
293: return null;
294: }
295:
296: while (members.size() > 1 && num_tries++ < 3) { // excluding myself
297: dest = target != null ? target : determineCoordinator();
298: if (dest == null)
299: return null;
300: msg.setDest(dest);
301: dests.removeAllElements();
302: dests.addElement(dest);
303: req = new GroupRequest(msg, corr, dests,
304: GroupRequest.GET_FIRST, timeout_return_state, 0);
305: req.execute();
306: rsp_list = req.getResults();
307: for (int i = 0; i < rsp_list.size(); i++) { // get the first non-suspected result
308: rsp = (Rsp) rsp_list.elementAt(i);
309: if (rsp.wasReceived())
310: return (byte[]) rsp.getValue();
311: }
312: Util.sleep(1000);
313: }
314:
315: return null;
316: }
317:
318: // Vector getStateFromMany(Vector targets) {
319: // Vector dests=new Vector(11);
320: // Message msg;
321: // StateTransferRequest r=new StateTransferRequest(StateTransferRequest.RETURN_STATE, local_addr);
322: // RspList rsp_list;
323: // GroupRequest req;
324: // int i;
325: //
326: //
327: // if(targets != null) {
328: // for(i=0; i < targets.size(); i++)
329: // if(!local_addr.equals(targets.elementAt(i)))
330: // dests.addElement(targets.elementAt(i));
331: // }
332: // else {
333: // for(i=0; i < members.size(); i++)
334: // if(!local_addr.equals(members.elementAt(i)))
335: // dests.addElement(members.elementAt(i));
336: // }
337: //
338: // if(dests.size() == 0)
339: // return null;
340: //
341: // msg=new Message();
342: // try {
343: // msg.setBuffer(Util.objectToByteBuffer(r));
344: // }
345: // catch(Exception e) {
346: // }
347: //
348: // req=new GroupRequest(msg, corr, dests, GroupRequest.GET_ALL, timeout_return_state, 0);
349: // req.execute();
350: // rsp_list=req.getResults();
351: // return rsp_list.getResults();
352: // }
353:
354: void sendMakeCopyMessage() throws Throwable {
355: GroupRequest req;
356: Message msg = new Message();
357: StateTransferRequest r = new StateTransferRequest(
358: StateTransferRequest.MAKE_COPY, local_addr);
359: Vector dests = new Vector(11);
360:
361: for (int i = 0; i < members.size(); i++)
362: dests.addElement(members.elementAt(i));
363:
364: if (dests.size() == 0)
365: return;
366:
367: try {
368: msg.setBuffer(Util.objectToByteBuffer(r));
369: } catch (Exception e) {
370: }
371:
372: req = new GroupRequest(msg, corr, dests, GroupRequest.GET_ALL,
373: timeout_return_state, 0);
374: req.execute();
375: }
376:
377: /**
378: * Return the first element of members which is not me. Otherwise return null.
379: */
380: Address determineCoordinator() {
381: Address ret = null;
382: if (members != null && members.size() > 1) {
383: for (int i = 0; i < members.size(); i++)
384: if (!local_addr.equals(members.elementAt(i)))
385: return (Address) members.elementAt(i);
386: }
387: return ret;
388: }
389:
390: /**
391: * If server, ask application to send us a copy of its state (STATE_TRANSFER up,
392: * STATE_TRANSFER down). If client, start queueing events. Queuing will be stopped when
393: * state has been retrieved (or not) from single or all member(s).
394: */
395: void makeCopy(Object sender) {
396: if (sender.equals(local_addr)) { // was sent by us, has to start queueing
397: passUp(new Event(Event.START_QUEUEING));
398: } else { // only retrieve state from appl when not in client state anymore
399: if (is_server) { // get state from application and store it locally
400: synchronized (state_xfer_mutex) {
401: cached_state = null;
402: StateTransferInfo info = new StateTransferInfo(
403: local_addr);
404: passUp(new Event(Event.GET_APPLSTATE, info));
405: if (cached_state == null) {
406: try {
407: state_xfer_mutex
408: .wait(timeout_get_appl_state); // wait for STATE_TRANSFER_OK
409: } catch (Exception e) {
410: }
411: }
412: }
413: }
414: }
415: }
416:
417: }
|