Source Code Cross Referenced for VotingAdapter.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jgroups.blocks;
002:
003:        import org.apache.commons.logging.Log;
004:        import org.apache.commons.logging.LogFactory;
005:        import org.jgroups.*;
006:        import org.jgroups.util.Rsp;
007:        import org.jgroups.util.RspList;
008:
009:        import java.io.Serializable;
010:        import java.util.*;
011:
012:        /**
013:         * Voting adapter provides a voting functionality for an application. There 
014:         * should be at most one {@link VotingAdapter} listening on one {@link Channel}
015:         * instance. Each adapter can have zero or more registered {@link VotingListener} 
016:         * instances that will be called during voting process. 
017:         * <p>
018:         * Decree is an object that has some semantic meaning within the application. 
019:         * Each voting listener receives a decree and can respond with either 
020:         * <code>true</code> or false. If the decree has no meaning for the voting
021:         * listener, it is required to throw {@link VoteException}. In this case
022:         * this specific listener will be excluded from the voting on the specified
023:         * decree. After performing local voting, this voting adapter sends the request
024:         * back to the originator of the voting process. Originator receives results
025:         * from each node and decides if all voting process succeeded or not depending 
026:         * on the consensus type specified during voting.
027:         * 
028:         * @author Roman Rokytskyy (rrokytskyy@acm.org)
029:         * @author Robert Schaffar-Taurok (robert@fusion.at)
030:         * @version $Id: VotingAdapter.java,v 1.10 2006/09/27 12:42:53 belaban Exp $
031:         */
032:        public class VotingAdapter implements  MessageListener,
033:                MembershipListener, VoteResponseProcessor {
034:
035:            /**
036:             * This consensus type means that at least one positive vote is required
037:             * for the voting to succeed.
038:             */
039:            public static final int VOTE_ANY = 0;
040:
041:            /**
042:             * This consensus type means that at least one positive vote and no negative
043:             * votes are required for the voting to succeed.
044:             */
045:            public static final int VOTE_ALL = 1;
046:
047:            /**
048:             * This consensus type means that number of positive votes should be greater
049:             * than number of negative votes.
050:             */
051:            public static final int VOTE_MAJORITY = 2;
052:
053:            private static final int PROCESS_CONTINUE = 0;
054:            private static final int PROCESS_SKIP = 1;
055:            private static final int PROCESS_BREAK = 2;
056:
057:            private final RpcDispatcher rpcDispatcher;
058:
059:            protected final Log log = LogFactory.getLog(getClass());
060:
061:            private final HashSet suspectedNodes = new HashSet();
062:            private boolean closed;
063:
064:            private final List membership_listeners = new LinkedList();
065:
066:            /**
067:             * Creates an instance of the VoteChannel that uses JGroups
068:             * for communication between group members.
069:             * @param channel JGroups channel.
070:             */
071:            public VotingAdapter(Channel channel) {
072:                rpcDispatcher = new RpcDispatcher(channel, this , this , this );
073:            }
074:
075:            public VotingAdapter(PullPushAdapter adapter, Serializable id) {
076:                rpcDispatcher = new RpcDispatcher(adapter, id, this , this , this );
077:            }
078:
079:            public Collection getMembers() {
080:                return rpcDispatcher != null ? rpcDispatcher.getMembers()
081:                        : null;
082:            }
083:
084:            public void addMembershipListener(MembershipListener l) {
085:                if (l != null && !membership_listeners.contains(l))
086:                    membership_listeners.add(l);
087:            }
088:
089:            public void removeMembershipListener(MembershipListener l) {
090:                if (l != null)
091:                    membership_listeners.remove(l);
092:            }
093:
094:            /**
095:             * Performs actual voting on the VoteChannel using the JGroups
096:             * facilities for communication.
097:             */
098:            public boolean vote(Object decree, int consensusType, long timeout)
099:                    throws ChannelException {
100:                return vote(decree, consensusType, timeout, null);
101:            }
102:
103:            /**
104:             * Performs actual voting on the VoteChannel using the JGroups
105:             * facilities for communication.
106:             */
107:            public boolean vote(Object decree, int consensusType, long timeout,
108:                    VoteResponseProcessor voteResponseProcessor)
109:                    throws ChannelException {
110:                if (closed)
111:                    throw new ChannelException("Channel was closed.");
112:
113:                if (log.isDebugEnabled())
114:                    log.debug("Conducting voting on decree " + decree
115:                            + ", consensus type "
116:                            + getConsensusStr(consensusType) + ", timeout "
117:                            + timeout);
118:
119:                int mode = GroupRequest.GET_ALL;
120:
121:                // perform the consensus mapping
122:                switch (consensusType) {
123:                case VotingAdapter.VOTE_ALL:
124:                    mode = GroupRequest.GET_ALL;
125:                    break;
126:                case VotingAdapter.VOTE_ANY:
127:                    mode = GroupRequest.GET_FIRST;
128:                    break;
129:                case VotingAdapter.VOTE_MAJORITY:
130:                    mode = GroupRequest.GET_MAJORITY;
131:                    break;
132:                default:
133:                    mode = GroupRequest.GET_ALL;
134:                }
135:
136:                try {
137:                    java.lang.reflect.Method method = this .getClass()
138:                            .getMethod("localVote",
139:                                    new Class[] { Object.class });
140:
141:                    MethodCall methodCall = new MethodCall(method,
142:                            new Object[] { decree });
143:
144:                    if (log.isDebugEnabled())
145:                        log.debug("Calling remote methods...");
146:
147:                    // vote
148:                    RspList responses = rpcDispatcher.callRemoteMethods(null,
149:                            methodCall, mode, timeout);
150:
151:                    if (log.isDebugEnabled())
152:                        log.debug("Checking responses.");
153:
154:                    if (voteResponseProcessor == null) {
155:                        voteResponseProcessor = this ;
156:                    }
157:
158:                    return voteResponseProcessor.processResponses(responses,
159:                            consensusType, decree);
160:                } catch (NoSuchMethodException nsmex) {
161:
162:                    // UPS!!! How can this happen?!
163:
164:                    if (log.isErrorEnabled())
165:                        log.error("Could not find method localVote(Object). "
166:                                + nsmex.toString());
167:
168:                    throw new UnsupportedOperationException(
169:                            "Cannot execute voting because of absence of "
170:                                    + this .getClass().getName()
171:                                    + ".localVote(Object) method.");
172:                }
173:            }
174:
175:            /**
176:             * Processes the response list and makes a decision according to the
177:             * type of the consensus for current voting.
178:             * <p>
179:             * Note: we do not support voting in case of Byzantine failures, i.e.
180:             * when the node responds with the fault message.
181:             */
182:            public boolean processResponses(RspList responses,
183:                    int consensusType, Object decree) throws ChannelException {
184:                if (responses == null) {
185:                    return false;
186:                }
187:
188:                boolean voteResult = false;
189:                int totalPositiveVotes = 0;
190:                int totalNegativeVotes = 0;
191:
192:                for (Iterator it = responses.values().iterator(); it.hasNext();) {
193:                    Rsp response = (Rsp) it.next();
194:
195:                    switch (checkResponse(response)) {
196:                    case PROCESS_SKIP:
197:                        continue;
198:                    case PROCESS_BREAK:
199:                        return false;
200:                    }
201:
202:                    VoteResult result = (VoteResult) response.getValue();
203:
204:                    totalPositiveVotes += result.getPositiveVotes();
205:                    totalNegativeVotes += result.getNegativeVotes();
206:                }
207:
208:                switch (consensusType) {
209:                case VotingAdapter.VOTE_ALL:
210:                    voteResult = (totalNegativeVotes == 0 && totalPositiveVotes > 0);
211:                    break;
212:                case VotingAdapter.VOTE_ANY:
213:                    voteResult = (totalPositiveVotes > 0);
214:                    break;
215:                case VotingAdapter.VOTE_MAJORITY:
216:                    voteResult = (totalPositiveVotes > totalNegativeVotes);
217:                }
218:
219:                return voteResult;
220:            }
221:
222:            /**
223:             * This method checks the response and says the processResponses() method
224:             * what to do.
225:             * @return PROCESS_CONTINUE to continue calculating votes,
226:             * PROCESS_BREAK to stop calculating votes from the nodes,
227:             * PROCESS_SKIP to skip current response.
228:             * @throws ChannelException when the response is fatal to the
229:             * current voting process.
230:             */
231:            private int checkResponse(Rsp response) throws ChannelException {
232:
233:                if (!response.wasReceived()) {
234:
235:                    if (log.isDebugEnabled())
236:                        log.debug("Response from node " + response.getSender()
237:                                + " was not received.");
238:
239:                    // what do we do when one node failed to respond?
240:                    //throw new ChannelException("Node " + response.GetSender() +
241:                    //	" failed to respond.");
242:                    return PROCESS_BREAK;
243:                }
244:
245:                /**@todo check what to do here */
246:                if (response.wasSuspected()) {
247:                    if (log.isDebugEnabled())
248:                        log.debug("Node " + response.getSender()
249:                                + " was suspected.");
250:
251:                    // wat do we do when one node is suspected?
252:                    return PROCESS_SKIP;
253:                }
254:
255:                Object object = response.getValue();
256:
257:                // we received exception/error, something went wrong
258:                // on one of the nodes... and we do not handle such faults
259:                if (object instanceof  Throwable) {
260:                    throw new ChannelException("Node " + response.getSender()
261:                            + " is faulty.");
262:                }
263:
264:                if (object == null) {
265:                    return PROCESS_SKIP;
266:                }
267:
268:                // it is always interesting to know the class that caused failure...
269:                if (!(object instanceof  VoteResult)) {
270:                    String faultClass = object.getClass().getName();
271:
272:                    // ...but we do not handle byzantine faults
273:                    throw new ChannelException("Node " + response.getSender()
274:                            + " generated fault (class " + faultClass + ')');
275:                }
276:
277:                // what if we received the response from faulty node?
278:                if (object instanceof  FailureVoteResult) {
279:
280:                    if (log.isErrorEnabled())
281:                        log.error(((FailureVoteResult) object).getReason());
282:
283:                    return PROCESS_BREAK;
284:                }
285:
286:                // everything is fine :)
287:                return PROCESS_CONTINUE;
288:            }
289:
290:            /**
291:             * Callback for notification about the new view of the group.
292:             */
293:            public void viewAccepted(View newView) {
294:
295:                // clean nodes that were suspected but still exist in new view
296:                Iterator iterator = suspectedNodes.iterator();
297:                while (iterator.hasNext()) {
298:                    Address suspectedNode = (Address) iterator.next();
299:                    if (newView.containsMember(suspectedNode))
300:                        iterator.remove();
301:                }
302:
303:                for (Iterator it = membership_listeners.iterator(); it
304:                        .hasNext();) {
305:                    MembershipListener listener = (MembershipListener) it
306:                            .next();
307:                    try {
308:                        listener.viewAccepted(newView);
309:                    } catch (Throwable t) {
310:                        if (log.isErrorEnabled())
311:                            log.error("failed calling viewAccepted() on "
312:                                    + listener, t);
313:                    }
314:                }
315:            }
316:
317:            /**
318:             * Callback for notification that one node is suspected
319:             */
320:            public void suspect(Address suspected) {
321:                suspectedNodes.add(suspected);
322:                for (Iterator it = membership_listeners.iterator(); it
323:                        .hasNext();) {
324:                    MembershipListener listener = (MembershipListener) it
325:                            .next();
326:                    try {
327:                        listener.suspect(suspected);
328:                    } catch (Throwable t) {
329:                        if (log.isErrorEnabled())
330:                            log.error(
331:                                    "failed calling suspect() on " + listener,
332:                                    t);
333:                    }
334:                }
335:            }
336:
337:            /**
338:             * Blocks the channel until the ViewAccepted is invoked.
339:             */
340:            public void block() {
341:                for (Iterator it = membership_listeners.iterator(); it
342:                        .hasNext();) {
343:                    MembershipListener listener = (MembershipListener) it
344:                            .next();
345:                    try {
346:                        listener.block();
347:                    } catch (Throwable t) {
348:                        if (log.isErrorEnabled())
349:                            log.error("failed calling block() on " + listener,
350:                                    t);
351:                    }
352:                }
353:            }
354:
355:            /**
356:             * Get the channel state.
357:             *
358:             * @return always <code>null</code>, we do not have any group-shared
359:             * state.
360:             */
361:            public byte[] getState() {
362:                return null;
363:            }
364:
365:            /**
366:             * Receive the message. All messages are ignored.
367:             *
368:             * @param msg message to check.
369:             */
370:            public void receive(org.jgroups.Message msg) {
371:                // do nothing
372:            }
373:
374:            /**
375:             * Set the channel state. We do nothing here.
376:             */
377:            public void setState(byte[] state) {
378:                // ignore the state, we do not have any.
379:            }
380:
381:            private final Set voteListeners = new HashSet();
382:            private VotingListener[] listeners;
383:
384:            /**
385:             * Vote on the specified decree requiring all nodes to vote.
386:             * 
387:             * @param decree decree on which nodes should vote.
388:             * @param timeout time during which nodes can vote.
389:             * 
390:             * @return <code>true</code> if nodes agreed on a decree, otherwise 
391:             * <code>false</code>
392:             * 
393:             * @throws ChannelException if something went wrong.
394:             */
395:            public boolean vote(Object decree, long timeout)
396:                    throws ChannelException {
397:                return vote(decree, timeout, null);
398:            }
399:
400:            /**
401:             * Vote on the specified decree requiring all nodes to vote.
402:             * 
403:             * @param decree decree on which nodes should vote.
404:             * @param timeout time during which nodes can vote.
405:             * @param voteResponseProcessor processor which will be called for every response that is received.
406:             * 
407:             * @return <code>true</code> if nodes agreed on a decree, otherwise 
408:             * <code>false</code>
409:             * 
410:             * @throws ChannelException if something went wrong.
411:             */
412:            public boolean vote(Object decree, long timeout,
413:                    VoteResponseProcessor voteResponseProcessor)
414:                    throws ChannelException {
415:                return vote(decree, VOTE_ALL, timeout, voteResponseProcessor);
416:            }
417:
418:            /**
419:             * Adds voting listener.
420:             */
421:            public void addVoteListener(VotingListener listener) {
422:                voteListeners.add(listener);
423:                listeners = (VotingListener[]) voteListeners
424:                        .toArray(new VotingListener[voteListeners.size()]);
425:            }
426:
427:            /**
428:             * Removes voting listener.
429:             */
430:            public void removeVoteListener(VotingListener listener) {
431:                voteListeners.remove(listener);
432:
433:                listeners = (VotingListener[]) voteListeners
434:                        .toArray(new VotingListener[voteListeners.size()]);
435:            }
436:
437:            /**
438:             * This method performs voting on the specific decree between all
439:             * local voteListeners.
440:             */
441:            public VoteResult localVote(Object decree) {
442:
443:                VoteResult voteResult = new VoteResult();
444:
445:                for (int i = 0; i < listeners.length; i++) {
446:                    VotingListener listener = listeners[i];
447:
448:                    try {
449:                        voteResult.addVote(listener.vote(decree));
450:                    } catch (VoteException vex) {
451:                        // do nothing here.
452:                    } catch (RuntimeException ex) {
453:
454:                        if (log.isErrorEnabled())
455:                            log.error(ex.toString());
456:
457:                        // if we are here, then listener 
458:                        // had thrown a RuntimeException
459:                        return new FailureVoteResult(ex.getMessage());
460:                    }
461:                }
462:
463:                if (log.isDebugEnabled())
464:                    log.debug("Voting on decree " + decree.toString() + " : "
465:                            + voteResult.toString());
466:
467:                return voteResult;
468:            }
469:
470:            /**
471:             * Convert consensus type into string representation. This method is 
472:             * useful for debugginf.
473:             * 
474:             * @param consensusType type of the consensus.
475:             * 
476:             * @return string representation of the consensus type.
477:             */
478:            public static String getConsensusStr(int consensusType) {
479:                switch (consensusType) {
480:                case VotingAdapter.VOTE_ALL:
481:                    return "VOTE_ALL";
482:                case VotingAdapter.VOTE_ANY:
483:                    return "VOTE_ANY";
484:                case VotingAdapter.VOTE_MAJORITY:
485:                    return "VOTE_MAJORITY";
486:                default:
487:                    return "UNKNOWN";
488:                }
489:            }
490:
491:            /**
492:             * This class represents the result of local voting. It contains a 
493:             * number of positive and negative votes collected during local voting.
494:             */
495:            public static class VoteResult implements  Serializable {
496:                private int positiveVotes = 0;
497:                private int negativeVotes = 0;
498:                private static final long serialVersionUID = 2868605599965196746L;
499:
500:                public void addVote(boolean vote) {
501:                    if (vote)
502:                        positiveVotes++;
503:                    else
504:                        negativeVotes++;
505:                }
506:
507:                public int getPositiveVotes() {
508:                    return positiveVotes;
509:                }
510:
511:                public int getNegativeVotes() {
512:                    return negativeVotes;
513:                }
514:
515:                public String toString() {
516:                    return "VoteResult: up=" + positiveVotes + ", down="
517:                            + negativeVotes;
518:                }
519:            }
520:
521:            /**
522:             * Class that represents a result of local voting on the failed node.
523:             */
524:            public static class FailureVoteResult extends VoteResult {
525:                private final String reason;
526:
527:                public FailureVoteResult(String reason) {
528:                    this .reason = reason;
529:                }
530:
531:                public String getReason() {
532:                    return reason;
533:                }
534:            }
535:
536:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.