001: package org.jgroups.blocks;
002:
003: import org.apache.commons.logging.Log;
004: import org.apache.commons.logging.LogFactory;
005: import org.jgroups.*;
006: import org.jgroups.util.Rsp;
007: import org.jgroups.util.RspList;
008:
009: import java.io.Serializable;
010: import java.util.*;
011:
012: /**
013: * Voting adapter provides a voting functionality for an application. There
014: * should be at most one {@link VotingAdapter} listening on one {@link Channel}
015: * instance. Each adapter can have zero or more registered {@link VotingListener}
016: * instances that will be called during voting process.
017: * <p>
018: * Decree is an object that has some semantic meaning within the application.
019: * Each voting listener receives a decree and can respond with either
020: * <code>true</code> or false. If the decree has no meaning for the voting
021: * listener, it is required to throw {@link VoteException}. In this case
022: * this specific listener will be excluded from the voting on the specified
023: * decree. After performing local voting, this voting adapter sends the request
024: * back to the originator of the voting process. Originator receives results
025: * from each node and decides if all voting process succeeded or not depending
026: * on the consensus type specified during voting.
027: *
028: * @author Roman Rokytskyy (rrokytskyy@acm.org)
029: * @author Robert Schaffar-Taurok (robert@fusion.at)
030: * @version $Id: VotingAdapter.java,v 1.10 2006/09/27 12:42:53 belaban Exp $
031: */
032: public class VotingAdapter implements MessageListener,
033: MembershipListener, VoteResponseProcessor {
034:
035: /**
036: * This consensus type means that at least one positive vote is required
037: * for the voting to succeed.
038: */
039: public static final int VOTE_ANY = 0;
040:
041: /**
042: * This consensus type means that at least one positive vote and no negative
043: * votes are required for the voting to succeed.
044: */
045: public static final int VOTE_ALL = 1;
046:
047: /**
048: * This consensus type means that number of positive votes should be greater
049: * than number of negative votes.
050: */
051: public static final int VOTE_MAJORITY = 2;
052:
053: private static final int PROCESS_CONTINUE = 0;
054: private static final int PROCESS_SKIP = 1;
055: private static final int PROCESS_BREAK = 2;
056:
057: private final RpcDispatcher rpcDispatcher;
058:
059: protected final Log log = LogFactory.getLog(getClass());
060:
061: private final HashSet suspectedNodes = new HashSet();
062: private boolean closed;
063:
064: private final List membership_listeners = new LinkedList();
065:
066: /**
067: * Creates an instance of the VoteChannel that uses JGroups
068: * for communication between group members.
069: * @param channel JGroups channel.
070: */
071: public VotingAdapter(Channel channel) {
072: rpcDispatcher = new RpcDispatcher(channel, this , this , this );
073: }
074:
075: public VotingAdapter(PullPushAdapter adapter, Serializable id) {
076: rpcDispatcher = new RpcDispatcher(adapter, id, this , this , this );
077: }
078:
079: public Collection getMembers() {
080: return rpcDispatcher != null ? rpcDispatcher.getMembers()
081: : null;
082: }
083:
084: public void addMembershipListener(MembershipListener l) {
085: if (l != null && !membership_listeners.contains(l))
086: membership_listeners.add(l);
087: }
088:
089: public void removeMembershipListener(MembershipListener l) {
090: if (l != null)
091: membership_listeners.remove(l);
092: }
093:
094: /**
095: * Performs actual voting on the VoteChannel using the JGroups
096: * facilities for communication.
097: */
098: public boolean vote(Object decree, int consensusType, long timeout)
099: throws ChannelException {
100: return vote(decree, consensusType, timeout, null);
101: }
102:
103: /**
104: * Performs actual voting on the VoteChannel using the JGroups
105: * facilities for communication.
106: */
107: public boolean vote(Object decree, int consensusType, long timeout,
108: VoteResponseProcessor voteResponseProcessor)
109: throws ChannelException {
110: if (closed)
111: throw new ChannelException("Channel was closed.");
112:
113: if (log.isDebugEnabled())
114: log.debug("Conducting voting on decree " + decree
115: + ", consensus type "
116: + getConsensusStr(consensusType) + ", timeout "
117: + timeout);
118:
119: int mode = GroupRequest.GET_ALL;
120:
121: // perform the consensus mapping
122: switch (consensusType) {
123: case VotingAdapter.VOTE_ALL:
124: mode = GroupRequest.GET_ALL;
125: break;
126: case VotingAdapter.VOTE_ANY:
127: mode = GroupRequest.GET_FIRST;
128: break;
129: case VotingAdapter.VOTE_MAJORITY:
130: mode = GroupRequest.GET_MAJORITY;
131: break;
132: default:
133: mode = GroupRequest.GET_ALL;
134: }
135:
136: try {
137: java.lang.reflect.Method method = this .getClass()
138: .getMethod("localVote",
139: new Class[] { Object.class });
140:
141: MethodCall methodCall = new MethodCall(method,
142: new Object[] { decree });
143:
144: if (log.isDebugEnabled())
145: log.debug("Calling remote methods...");
146:
147: // vote
148: RspList responses = rpcDispatcher.callRemoteMethods(null,
149: methodCall, mode, timeout);
150:
151: if (log.isDebugEnabled())
152: log.debug("Checking responses.");
153:
154: if (voteResponseProcessor == null) {
155: voteResponseProcessor = this ;
156: }
157:
158: return voteResponseProcessor.processResponses(responses,
159: consensusType, decree);
160: } catch (NoSuchMethodException nsmex) {
161:
162: // UPS!!! How can this happen?!
163:
164: if (log.isErrorEnabled())
165: log.error("Could not find method localVote(Object). "
166: + nsmex.toString());
167:
168: throw new UnsupportedOperationException(
169: "Cannot execute voting because of absence of "
170: + this .getClass().getName()
171: + ".localVote(Object) method.");
172: }
173: }
174:
175: /**
176: * Processes the response list and makes a decision according to the
177: * type of the consensus for current voting.
178: * <p>
179: * Note: we do not support voting in case of Byzantine failures, i.e.
180: * when the node responds with the fault message.
181: */
182: public boolean processResponses(RspList responses,
183: int consensusType, Object decree) throws ChannelException {
184: if (responses == null) {
185: return false;
186: }
187:
188: boolean voteResult = false;
189: int totalPositiveVotes = 0;
190: int totalNegativeVotes = 0;
191:
192: for (Iterator it = responses.values().iterator(); it.hasNext();) {
193: Rsp response = (Rsp) it.next();
194:
195: switch (checkResponse(response)) {
196: case PROCESS_SKIP:
197: continue;
198: case PROCESS_BREAK:
199: return false;
200: }
201:
202: VoteResult result = (VoteResult) response.getValue();
203:
204: totalPositiveVotes += result.getPositiveVotes();
205: totalNegativeVotes += result.getNegativeVotes();
206: }
207:
208: switch (consensusType) {
209: case VotingAdapter.VOTE_ALL:
210: voteResult = (totalNegativeVotes == 0 && totalPositiveVotes > 0);
211: break;
212: case VotingAdapter.VOTE_ANY:
213: voteResult = (totalPositiveVotes > 0);
214: break;
215: case VotingAdapter.VOTE_MAJORITY:
216: voteResult = (totalPositiveVotes > totalNegativeVotes);
217: }
218:
219: return voteResult;
220: }
221:
222: /**
223: * This method checks the response and says the processResponses() method
224: * what to do.
225: * @return PROCESS_CONTINUE to continue calculating votes,
226: * PROCESS_BREAK to stop calculating votes from the nodes,
227: * PROCESS_SKIP to skip current response.
228: * @throws ChannelException when the response is fatal to the
229: * current voting process.
230: */
231: private int checkResponse(Rsp response) throws ChannelException {
232:
233: if (!response.wasReceived()) {
234:
235: if (log.isDebugEnabled())
236: log.debug("Response from node " + response.getSender()
237: + " was not received.");
238:
239: // what do we do when one node failed to respond?
240: //throw new ChannelException("Node " + response.GetSender() +
241: // " failed to respond.");
242: return PROCESS_BREAK;
243: }
244:
245: /**@todo check what to do here */
246: if (response.wasSuspected()) {
247: if (log.isDebugEnabled())
248: log.debug("Node " + response.getSender()
249: + " was suspected.");
250:
251: // wat do we do when one node is suspected?
252: return PROCESS_SKIP;
253: }
254:
255: Object object = response.getValue();
256:
257: // we received exception/error, something went wrong
258: // on one of the nodes... and we do not handle such faults
259: if (object instanceof Throwable) {
260: throw new ChannelException("Node " + response.getSender()
261: + " is faulty.");
262: }
263:
264: if (object == null) {
265: return PROCESS_SKIP;
266: }
267:
268: // it is always interesting to know the class that caused failure...
269: if (!(object instanceof VoteResult)) {
270: String faultClass = object.getClass().getName();
271:
272: // ...but we do not handle byzantine faults
273: throw new ChannelException("Node " + response.getSender()
274: + " generated fault (class " + faultClass + ')');
275: }
276:
277: // what if we received the response from faulty node?
278: if (object instanceof FailureVoteResult) {
279:
280: if (log.isErrorEnabled())
281: log.error(((FailureVoteResult) object).getReason());
282:
283: return PROCESS_BREAK;
284: }
285:
286: // everything is fine :)
287: return PROCESS_CONTINUE;
288: }
289:
290: /**
291: * Callback for notification about the new view of the group.
292: */
293: public void viewAccepted(View newView) {
294:
295: // clean nodes that were suspected but still exist in new view
296: Iterator iterator = suspectedNodes.iterator();
297: while (iterator.hasNext()) {
298: Address suspectedNode = (Address) iterator.next();
299: if (newView.containsMember(suspectedNode))
300: iterator.remove();
301: }
302:
303: for (Iterator it = membership_listeners.iterator(); it
304: .hasNext();) {
305: MembershipListener listener = (MembershipListener) it
306: .next();
307: try {
308: listener.viewAccepted(newView);
309: } catch (Throwable t) {
310: if (log.isErrorEnabled())
311: log.error("failed calling viewAccepted() on "
312: + listener, t);
313: }
314: }
315: }
316:
317: /**
318: * Callback for notification that one node is suspected
319: */
320: public void suspect(Address suspected) {
321: suspectedNodes.add(suspected);
322: for (Iterator it = membership_listeners.iterator(); it
323: .hasNext();) {
324: MembershipListener listener = (MembershipListener) it
325: .next();
326: try {
327: listener.suspect(suspected);
328: } catch (Throwable t) {
329: if (log.isErrorEnabled())
330: log.error(
331: "failed calling suspect() on " + listener,
332: t);
333: }
334: }
335: }
336:
337: /**
338: * Blocks the channel until the ViewAccepted is invoked.
339: */
340: public void block() {
341: for (Iterator it = membership_listeners.iterator(); it
342: .hasNext();) {
343: MembershipListener listener = (MembershipListener) it
344: .next();
345: try {
346: listener.block();
347: } catch (Throwable t) {
348: if (log.isErrorEnabled())
349: log.error("failed calling block() on " + listener,
350: t);
351: }
352: }
353: }
354:
355: /**
356: * Get the channel state.
357: *
358: * @return always <code>null</code>, we do not have any group-shared
359: * state.
360: */
361: public byte[] getState() {
362: return null;
363: }
364:
365: /**
366: * Receive the message. All messages are ignored.
367: *
368: * @param msg message to check.
369: */
370: public void receive(org.jgroups.Message msg) {
371: // do nothing
372: }
373:
374: /**
375: * Set the channel state. We do nothing here.
376: */
377: public void setState(byte[] state) {
378: // ignore the state, we do not have any.
379: }
380:
381: private final Set voteListeners = new HashSet();
382: private VotingListener[] listeners;
383:
384: /**
385: * Vote on the specified decree requiring all nodes to vote.
386: *
387: * @param decree decree on which nodes should vote.
388: * @param timeout time during which nodes can vote.
389: *
390: * @return <code>true</code> if nodes agreed on a decree, otherwise
391: * <code>false</code>
392: *
393: * @throws ChannelException if something went wrong.
394: */
395: public boolean vote(Object decree, long timeout)
396: throws ChannelException {
397: return vote(decree, timeout, null);
398: }
399:
400: /**
401: * Vote on the specified decree requiring all nodes to vote.
402: *
403: * @param decree decree on which nodes should vote.
404: * @param timeout time during which nodes can vote.
405: * @param voteResponseProcessor processor which will be called for every response that is received.
406: *
407: * @return <code>true</code> if nodes agreed on a decree, otherwise
408: * <code>false</code>
409: *
410: * @throws ChannelException if something went wrong.
411: */
412: public boolean vote(Object decree, long timeout,
413: VoteResponseProcessor voteResponseProcessor)
414: throws ChannelException {
415: return vote(decree, VOTE_ALL, timeout, voteResponseProcessor);
416: }
417:
418: /**
419: * Adds voting listener.
420: */
421: public void addVoteListener(VotingListener listener) {
422: voteListeners.add(listener);
423: listeners = (VotingListener[]) voteListeners
424: .toArray(new VotingListener[voteListeners.size()]);
425: }
426:
427: /**
428: * Removes voting listener.
429: */
430: public void removeVoteListener(VotingListener listener) {
431: voteListeners.remove(listener);
432:
433: listeners = (VotingListener[]) voteListeners
434: .toArray(new VotingListener[voteListeners.size()]);
435: }
436:
437: /**
438: * This method performs voting on the specific decree between all
439: * local voteListeners.
440: */
441: public VoteResult localVote(Object decree) {
442:
443: VoteResult voteResult = new VoteResult();
444:
445: for (int i = 0; i < listeners.length; i++) {
446: VotingListener listener = listeners[i];
447:
448: try {
449: voteResult.addVote(listener.vote(decree));
450: } catch (VoteException vex) {
451: // do nothing here.
452: } catch (RuntimeException ex) {
453:
454: if (log.isErrorEnabled())
455: log.error(ex.toString());
456:
457: // if we are here, then listener
458: // had thrown a RuntimeException
459: return new FailureVoteResult(ex.getMessage());
460: }
461: }
462:
463: if (log.isDebugEnabled())
464: log.debug("Voting on decree " + decree.toString() + " : "
465: + voteResult.toString());
466:
467: return voteResult;
468: }
469:
470: /**
471: * Convert consensus type into string representation. This method is
472: * useful for debugginf.
473: *
474: * @param consensusType type of the consensus.
475: *
476: * @return string representation of the consensus type.
477: */
478: public static String getConsensusStr(int consensusType) {
479: switch (consensusType) {
480: case VotingAdapter.VOTE_ALL:
481: return "VOTE_ALL";
482: case VotingAdapter.VOTE_ANY:
483: return "VOTE_ANY";
484: case VotingAdapter.VOTE_MAJORITY:
485: return "VOTE_MAJORITY";
486: default:
487: return "UNKNOWN";
488: }
489: }
490:
491: /**
492: * This class represents the result of local voting. It contains a
493: * number of positive and negative votes collected during local voting.
494: */
495: public static class VoteResult implements Serializable {
496: private int positiveVotes = 0;
497: private int negativeVotes = 0;
498: private static final long serialVersionUID = 2868605599965196746L;
499:
500: public void addVote(boolean vote) {
501: if (vote)
502: positiveVotes++;
503: else
504: negativeVotes++;
505: }
506:
507: public int getPositiveVotes() {
508: return positiveVotes;
509: }
510:
511: public int getNegativeVotes() {
512: return negativeVotes;
513: }
514:
515: public String toString() {
516: return "VoteResult: up=" + positiveVotes + ", down="
517: + negativeVotes;
518: }
519: }
520:
521: /**
522: * Class that represents a result of local voting on the failed node.
523: */
524: public static class FailureVoteResult extends VoteResult {
525: private final String reason;
526:
527: public FailureVoteResult(String reason) {
528: this .reason = reason;
529: }
530:
531: public String getReason() {
532: return reason;
533: }
534: }
535:
536: }
|