001: package org.jgroups.blocks;
002:
003: import org.jgroups.ChannelException;
004:
005: /**
006: * This adapter introduces simple two-phase voting on a specified decree. All
007: * nodes in the group receive a decree in "prepare" phase where they expres
008: * their opinion on the decree. If all nodes voted positively on decree, next
009: * phase "commit" fixes changes that were made in "prepare" phase, otherwise
010: * changes are canceled in "abort" phase.
011: *
012: * @author Roman Rokytskyy (rrokytskyy@acm.org)
013: * @author Robert Schaffar-Taurok (robert@fusion.at)
014: * @version $Id: TwoPhaseVotingAdapter.java,v 1.4 2005/06/08 15:56:54 publicnmi Exp $
015: */
016: public class TwoPhaseVotingAdapter {
017:
018: private final VotingAdapter voteChannel;
019:
020: /**
021: * Creats an instance of the class.
022: * @param voteChannel the channel that will be used for voting.
023: */
024: public TwoPhaseVotingAdapter(VotingAdapter voteChannel) {
025: this .voteChannel = voteChannel;
026: }
027:
028: /**
029: * Wraps actual listener with the VoteChannelListener and adds to the
030: * voteChannel
031: */
032: public void addListener(TwoPhaseVotingListener listener) {
033: voteChannel.addVoteListener(new TwoPhaseVoteWrapper(listener));
034: }
035:
036: /**
037: * Removes the listener from the voteChannel
038: */
039: public void removeListener(TwoPhaseVotingListener listener) {
040: voteChannel
041: .removeVoteListener(new TwoPhaseVoteWrapper(listener));
042: }
043:
044: /**
045: * Performs the two-phase voting on the decree. After the voting each
046: * group member remains in the same state as others.
047: */
048: public boolean vote(Object decree, long timeout)
049: throws ChannelException {
050: return vote(decree, timeout, null);
051: }
052:
053: /**
054: * Performs the two-phase voting on the decree. After the voting each
055: * group member remains in the same state as others.
056: */
057: public boolean vote(Object decree, long timeout,
058: VoteResponseProcessor voteResponseProcessor)
059: throws ChannelException {
060: // wrap real decree
061: TwoPhaseWrapper wrappedDecree = new TwoPhaseWrapper(decree);
062:
063: // check the decree acceptance
064: try {
065: if (voteChannel.vote(wrappedDecree, timeout / 3,
066: voteResponseProcessor)) {
067: wrappedDecree.commit();
068:
069: // try to commit decree
070: if (!voteChannel.vote(wrappedDecree, timeout / 3,
071: voteResponseProcessor)) {
072: // strange, should fail during prepare... abort all
073: wrappedDecree.abort();
074: voteChannel.vote(wrappedDecree, timeout / 3,
075: voteResponseProcessor);
076: return false;
077: } else
078: return true;
079:
080: } else {
081: // somebody is not accepting the decree... abort
082: wrappedDecree.abort();
083: voteChannel.vote(wrappedDecree, timeout / 3,
084: voteResponseProcessor);
085: return false;
086: }
087: } catch (ChannelException chex) {
088: wrappedDecree.abort();
089: voteChannel.vote(wrappedDecree, timeout / 3,
090: voteResponseProcessor);
091: throw chex;
092: }
093: }
094:
095: /**
096: * @return Returns the voteChannel.
097: */
098: public VotingAdapter getVoteChannel() {
099: return voteChannel;
100: }
101:
102: public static class TwoPhaseVoteWrapper implements VotingListener {
103:
104: private final TwoPhaseVotingListener listener;
105:
106: public TwoPhaseVoteWrapper(TwoPhaseVotingListener listener) {
107: this .listener = listener;
108: }
109:
110: public boolean vote(Object decree) throws VoteException {
111: if (!(decree instanceof TwoPhaseWrapper))
112: throw new VoteException(
113: "Not my type of decree. Ignore me.");
114:
115: TwoPhaseWrapper wrapper = (TwoPhaseWrapper) decree;
116:
117: // invoke the corresponding operation
118: if (wrapper.isPrepare())
119: return listener.prepare(wrapper.getDecree());
120: else if (wrapper.isCommit())
121: return listener.commit(wrapper.getDecree());
122: else {
123: listener.abort(wrapper.getDecree());
124: return false;
125: }
126: }
127:
128: /*
129:
130: This wrapper is completely equal to the object it wraps.
131:
132: Therefore the hashCode():int and equals(Object):boolean are
133: simply delegated to the wrapped code.
134:
135: */
136:
137: public int hashCode() {
138: return listener.hashCode();
139: }
140:
141: public boolean equals(Object other) {
142: return listener.equals(other);
143: }
144: }
145:
146: /**
147: * Wrapper of the decree to voting decree.
148: */
149: public static class TwoPhaseWrapper implements java.io.Serializable {
150: private static final int PREPARE = 0;
151: private static final int COMMIT = 1;
152: private static final int ABORT = 2;
153:
154: public TwoPhaseWrapper(Object decree) {
155: setDecree(decree);
156: setType(PREPARE);
157: }
158:
159: private Object decree;
160: private int type;
161:
162: public Object getDecree() {
163: return decree;
164: }
165:
166: public void setDecree(Object decree) {
167: this .decree = decree;
168: }
169:
170: private int getType() {
171: return type;
172: }
173:
174: private void setType(int type) {
175: this .type = type;
176: }
177:
178: private boolean isType(int type) {
179: return this .type == type;
180: }
181:
182: public boolean isPrepare() {
183: return isType(PREPARE);
184: }
185:
186: public boolean isCommit() {
187: return isType(COMMIT);
188: }
189:
190: public boolean isAbort() {
191: return isType(ABORT);
192: }
193:
194: public void commit() {
195: setType(COMMIT);
196: }
197:
198: public void abort() {
199: setType(ABORT);
200: }
201:
202: public String toString() {
203: return decree.toString();
204: }
205: }
206: }
|