Source Code Cross Referenced for DistributedLockManager.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jgroups.blocks;
002:
003:        import org.apache.commons.logging.Log;
004:        import org.apache.commons.logging.LogFactory;
005:        import org.jgroups.ChannelException;
006:        import org.jgroups.MembershipListener;
007:        import org.jgroups.View;
008:        import org.jgroups.Address;
009:        import org.jgroups.blocks.VotingAdapter.FailureVoteResult;
010:        import org.jgroups.blocks.VotingAdapter.VoteResult;
011:        import org.jgroups.util.Rsp;
012:        import org.jgroups.util.RspList;
013:
014:        import java.io.Serializable;
015:        import java.util.*;
016:
017:        /**
018:         * Distributed lock manager is responsible for maintaining the lock information
019:         * consistent on all participating nodes.
020:         * 
021:         * @author Roman Rokytskyy (rrokytskyy@acm.org)
022:         * @author Robert Schaffar-Taurok (robert@fusion.at)
023:         * @version $Id: DistributedLockManager.java,v 1.8 2006/08/15 09:18:53 belaban Exp $
024:         */
025:        public class DistributedLockManager implements  TwoPhaseVotingListener,
026:                LockManager, VoteResponseProcessor, MembershipListener {
027:            /**
028:             * Definitions for the implementation of the VoteResponseProcessor
029:             */
030:            private static final int PROCESS_CONTINUE = 0;
031:            private static final int PROCESS_SKIP = 1;
032:            private static final int PROCESS_BREAK = 2;
033:
034:            /**
035:             * This parameter means that lock acquisition expires after 5 seconds.
036:             * If there were no "commit" operation on prepared lock, then it
037:             * is treated as expired and is removed from the prepared locks table.
038:             */
039:            private static final long ACQUIRE_EXPIRATION = 5000;
040:
041:            /**
042:             * This parameter is used during lock releasing. If group fails to release
043:             * the lock during the specified period of time, unlocking fails.
044:             */
045:            private static final long VOTE_TIMEOUT = 10000;
046:
047:            /** HashMap<Object,LockDecree>. List of all prepared locks */
048:            private final HashMap preparedLocks = new HashMap();
049:
050:            /** HashMap<Object,LockDecree>. List of all prepared releases */
051:            private final HashMap preparedReleases = new HashMap();
052:
053:            /* HashMap<Object,LockDecree>. List of locks on the node */
054:            private final HashMap heldLocks = new HashMap();
055:
056:            private final TwoPhaseVotingAdapter votingAdapter;
057:
058:            private final Object id;
059:
060:            final Vector current_members = new Vector();
061:
062:            protected final Log log = LogFactory.getLog(getClass());
063:
064:            /**
065:             * Create instance of this class.
066:             * 
067:             * @param voteChannel instance of {@link VotingAdapter} that will be used 
068:             * for voting purposes on the lock decrees. <tt>voteChannel()</tt> will
069:             * be wrapped by the instance of the {@link TwoPhaseVotingAdapter}.
070:             * 
071:             * @param id the unique identifier of this lock manager.
072:             * 
073:             * todo check if the node with the same id is already in the group.
074:             */
075:            public DistributedLockManager(VotingAdapter voteChannel, Object id) {
076:                this (new TwoPhaseVotingAdapter(voteChannel), id);
077:            }
078:
079:            /**
080:             *  Constructor for the DistributedLockManager_cl object.
081:             * 
082:             *  @param channel instance of {@link TwoPhaseVotingAdapter}
083:             *  that will be used for voting purposes on the lock decrees.
084:             * 
085:             *  @param id the unique identifier of this lock manager.
086:             * 
087:             *  @todo check if the node with the same id is already in the group.
088:             */
089:            public DistributedLockManager(TwoPhaseVotingAdapter channel,
090:                    Object id) {
091:                this .id = id;
092:                this .votingAdapter = channel;
093:                this .votingAdapter.addListener(this );
094:                if (votingAdapter != null
095:                        && votingAdapter.getVoteChannel() != null)
096:                    votingAdapter.getVoteChannel().addMembershipListener(this );
097:                setInitialMembership(votingAdapter.getVoteChannel()
098:                        .getMembers());
099:            }
100:
101:            private void setInitialMembership(Collection members) {
102:                if (members != null) {
103:                    current_members.clear();
104:                    current_members.addAll(members);
105:                }
106:            }
107:
108:            /**
109:             * Performs local lock. This method also performs the clean-up of the lock
110:             * table, all expired locks are removed.
111:             */
112:            private boolean localLock(LockDecree lockDecree) {
113:                // remove expired locks
114:                removeExpired(lockDecree);
115:
116:                LockDecree localLock = (LockDecree) heldLocks.get(lockDecree
117:                        .getKey());
118:
119:                if (localLock == null) {
120:
121:                    // promote lock into commited state
122:                    lockDecree.commit();
123:
124:                    // no lock exist, perform local lock, note:
125:                    // we do not store locks that were requested by other manager.
126:                    if (lockDecree.managerId.equals(id))
127:                        heldLocks.put(lockDecree.getKey(), lockDecree);
128:
129:                    // everything is fine :)
130:                    return true;
131:                } else
132:                    return localLock.requester.equals(lockDecree.requester);
133:
134:            }
135:
136:            /**
137:             * Returns <code>true</code> if the requested lock can be granted by the
138:             * current node.
139:             * 
140:             * @param decree instance of <code>LockDecree</code> containing information
141:             * about the lock.
142:             */
143:            private boolean canLock(LockDecree decree) {
144:                // clean expired locks
145:                removeExpired(decree);
146:
147:                LockDecree lock = (LockDecree) heldLocks.get(decree.getKey());
148:                if (lock == null)
149:                    return true;
150:                else
151:                    return lock.requester.equals(decree.requester);
152:            }
153:
154:            /**
155:             * Returns <code>true</code> if the requested lock can be released by the
156:             * current node.
157:             * 
158:             * @param decree instance of {@link LockDecree} containing information
159:             * about the lock.
160:             */
161:            private boolean canRelease(LockDecree decree) {
162:                // clean expired locks
163:                removeExpired(decree);
164:
165:                // we need to check only hold locks, because
166:                // prepared locks cannot contain the lock
167:                LockDecree lock = (LockDecree) heldLocks.get(decree.getKey());
168:                if (lock == null)
169:                    // check if this holds...
170:                    return true;
171:                else
172:                    return lock.requester.equals(decree.requester);
173:            }
174:
175:            /**
176:             * Removes expired locks.
177:             * 
178:             * @param decree instance of {@link LockDecree} describing the lock.
179:             */
180:            private void removeExpired(LockDecree decree) {
181:                // remove the invalid (expired) lock
182:                LockDecree localLock = (LockDecree) heldLocks.get(decree
183:                        .getKey());
184:                if (localLock != null && !localLock.isValid())
185:                    heldLocks.remove(localLock.getKey());
186:            }
187:
188:            /**
189:             * Releases lock locally.
190:             * 
191:             * @param lockDecree instance of {@link LockDecree} describing the lock.
192:             */
193:            private boolean localRelease(LockDecree lockDecree) {
194:                // remove expired locks
195:                removeExpired(lockDecree);
196:
197:                LockDecree localLock = (LockDecree) heldLocks.get(lockDecree
198:                        .getKey());
199:
200:                if (localLock == null) {
201:                    // no lock exist
202:                    return true;
203:                } else if (localLock.requester.equals(lockDecree.requester)) {
204:                    // requester owns the lock, release the lock
205:                    heldLocks.remove(lockDecree.getKey());
206:                    return true;
207:                } else
208:                    // lock does not belong to requester
209:                    return false;
210:            }
211:
212:            /**
213:             * Locks an object with <code>lockId</code> on behalf of the specified
214:             * <code>owner</code>.
215:             * 
216:             * @param lockId <code>Object</code> representing the object to be locked.
217:             * @param owner object that requests the lock. This should be the Address of a JGroups member, otherwise we cannot
218:             * release the locks for a crashed member !
219:             * @param timeout time during which group members should decide
220:             * whether to grant a lock or not.
221:             *
222:             * @throws LockNotGrantedException when the lock cannot be granted.
223:             * 
224:             * @throws ClassCastException if lockId or owner are not serializable.
225:             * 
226:             * @throws ChannelException if something bad happened to underlying channel.
227:             */
228:            public void lock(Object lockId, Object owner, int timeout)
229:                    throws LockNotGrantedException, ChannelException {
230:                if (!(lockId instanceof  Serializable)
231:                        || !(owner instanceof  Serializable))
232:                    throw new ClassCastException(
233:                            "DistributedLockManager works only with serializable objects.");
234:
235:                boolean acquired = votingAdapter.vote(new AcquireLockDecree(
236:                        lockId, owner, id), timeout);
237:
238:                if (!acquired)
239:                    throw new LockNotGrantedException("Lock " + lockId
240:                            + " cannot be granted.");
241:            }
242:
243:            /**
244:             * Unlocks an object with <code>lockId</code> on behalf of the specified
245:             * <code>owner</code>.
246:             * 
247:             * since 2.2.9 this method is only a wrapper for 
248:             * unlock(Object lockId, Object owner, boolean releaseMultiLocked).
249:             * Use that with releaseMultiLocked set to true if you want to be able to
250:             * release multiple locked locks (for example after a merge)
251:             * 
252:             * @param lockId <code>long</code> representing the object to be unlocked.
253:             * @param owner object that releases the lock.
254:             *
255:             * @throws LockNotReleasedException when the lock cannot be released.
256:             * @throws ClassCastException if lockId or owner are not serializable.
257:             * 
258:             */
259:            public void unlock(Object lockId, Object owner)
260:                    throws LockNotReleasedException, ChannelException {
261:                try {
262:                    unlock(lockId, owner, false, VOTE_TIMEOUT);
263:                } catch (LockMultiLockedException e) {
264:                    // This should never happen when releaseMultiLocked is false
265:                    log
266:                            .error(
267:                                    "Caught MultiLockedException but releaseMultiLocked is false",
268:                                    e);
269:                }
270:            }
271:
272:            public void unlock(Object lockId, Object owner, long timeout)
273:                    throws LockNotReleasedException, ChannelException {
274:                try {
275:                    unlock(lockId, owner, false, timeout);
276:                } catch (LockMultiLockedException e) {
277:                    // This should never happen when releaseMultiLocked is false
278:                    log
279:                            .error(
280:                                    "Caught MultiLockedException but releaseMultiLocked is false",
281:                                    e);
282:                }
283:            }
284:
285:            /**
286:             * Unlocks an object with <code>lockId</code> on behalf of the specified
287:             * <code>owner</code>.
288:             * @param lockId <code>long</code> representing the object to be unlocked.
289:             * @param owner object that releases the lock.
290:             * @param releaseMultiLocked releases also multiple locked locks. (eg. locks that are locked by another DLM after a merge)
291:             *
292:             * @throws LockNotReleasedException when the lock cannot be released.
293:             * @throws ClassCastException if lockId or owner are not serializable.
294:             * @throws LockMultiLockedException if releaseMultiLocked is true and a multiple locked lock has been released.
295:             */
296:            public void unlock(Object lockId, Object owner,
297:                    boolean releaseMultiLocked)
298:                    throws LockNotReleasedException, ChannelException,
299:                    LockMultiLockedException {
300:                unlock(lockId, owner, releaseMultiLocked, VOTE_TIMEOUT);
301:            }
302:
303:            public void unlock(Object lockId, Object owner,
304:                    boolean releaseMultiLocked, long timeout)
305:                    throws LockNotReleasedException, ChannelException,
306:                    LockMultiLockedException {
307:
308:                if (!(lockId instanceof  Serializable)
309:                        || !(owner instanceof  Serializable))
310:                    throw new ClassCastException("DistributedLockManager "
311:                            + "works only with serializable objects.");
312:
313:                ReleaseLockDecree releaseLockDecree = new ReleaseLockDecree(
314:                        lockId, owner, id);
315:                boolean released = false;
316:                if (releaseMultiLocked) {
317:                    released = votingAdapter.vote(releaseLockDecree, timeout,
318:                            this );
319:                    if (releaseLockDecree.isMultipleLocked()) {
320:                        throw new LockMultiLockedException(
321:                                "Lock was also locked by other DistributedLockManager(s)");
322:                    }
323:                } else {
324:                    released = votingAdapter.vote(releaseLockDecree, timeout);
325:                }
326:
327:                if (!released)
328:                    throw new LockNotReleasedException(
329:                            "Lock cannot be unlocked.");
330:            }
331:
332:            /**
333:             * Checks the list of prepared locks/unlocks to determine if we are in the
334:             * middle of the two-phase commit process for the lock acqusition/release.
335:             * Here we do not tolerate if the request comes from the same node on behalf
336:             * of the same owner.
337:             * 
338:             * @param preparedContainer either <code>preparedLocks</code> or
339:             * <code>preparedReleases</code> depending on the situation.
340:             * 
341:             * @param requestedDecree instance of <code>LockDecree</code> representing
342:             * the lock.
343:             */
344:            private boolean checkPrepared(HashMap preparedContainer,
345:                    LockDecree requestedDecree) {
346:                LockDecree preparedDecree = (LockDecree) preparedContainer
347:                        .get(requestedDecree.getKey());
348:
349:                // if prepared lock is not valid, remove it from the list
350:                if ((preparedDecree != null) && !preparedDecree.isValid()) {
351:                    preparedContainer.remove(preparedDecree.getKey());
352:
353:                    preparedDecree = null;
354:                }
355:
356:                if (preparedDecree != null) {
357:                    return requestedDecree.requester
358:                            .equals(preparedDecree.requester);
359:                } else
360:                    // it was not prepared... sorry...
361:                    return true;
362:            }
363:
364:            /**
365:             * Prepare phase for the lock acquisition or release.
366:             * 
367:             * @param decree should be an instance <code>LockDecree</code>, if not,
368:             * we throw <code>VoteException</code> to be ignored by the
369:             * <code>VoteChannel</code>.
370:             * 
371:             * @return <code>true</code> when preparing the lock operation succeeds.
372:             * 
373:             * @throws VoteException if we should be ignored during voting.
374:             */
375:            public synchronized boolean prepare(Object decree)
376:                    throws VoteException {
377:                if (!(decree instanceof  LockDecree))
378:                    throw new VoteException("Uknown decree type. Ignore me.");
379:
380:                if (decree instanceof  AcquireLockDecree) {
381:                    AcquireLockDecree acquireDecree = (AcquireLockDecree) decree;
382:                    if (log.isDebugEnabled())
383:                        log.debug("Preparing to acquire decree "
384:                                + acquireDecree.lockId);
385:
386:                    if (!checkPrepared(preparedLocks, acquireDecree))
387:                        // there is a prepared lock owned by third party
388:                        return false;
389:
390:                    if (canLock(acquireDecree)) {
391:                        preparedLocks
392:                                .put(acquireDecree.getKey(), acquireDecree);
393:                        return true;
394:                    } else
395:                        // we are unable to aquire local lock
396:                        return false;
397:                } else if (decree instanceof  ReleaseLockDecree) {
398:                    ReleaseLockDecree releaseDecree = (ReleaseLockDecree) decree;
399:
400:                    if (log.isDebugEnabled())
401:                        log.debug("Preparing to release decree "
402:                                + releaseDecree.lockId);
403:
404:                    if (!checkPrepared(preparedReleases, releaseDecree))
405:                        // there is a prepared release owned by third party
406:                        return false;
407:
408:                    if (canRelease(releaseDecree)) {
409:                        preparedReleases.put(releaseDecree.getKey(),
410:                                releaseDecree);
411:                        // we have local lock and the prepared lock
412:                        return true;
413:                    } else
414:                        // we were unable to aquire local lock
415:                        return false;
416:                } else if (decree instanceof  MultiLockDecree) {
417:                    // Here we abuse the voting mechanism for notifying the other lockManagers of multiple locked objects.
418:                    MultiLockDecree multiLockDecree = (MultiLockDecree) decree;
419:
420:                    if (log.isDebugEnabled()) {
421:                        log.debug("Marking " + multiLockDecree.getKey()
422:                                + " as multilocked");
423:                    }
424:
425:                    LockDecree lockDecree = (LockDecree) heldLocks
426:                            .get(multiLockDecree.getKey());
427:                    if (lockDecree != null) {
428:                        lockDecree.setMultipleLocked(true);
429:                    }
430:                    return true;
431:                }
432:
433:                // we should not be here
434:                return false;
435:            }
436:
437:            /**
438:             * Commit phase for the lock acquisition or release.
439:             * 
440:             * @param decree should be an instance <code>LockDecree</code>, if not,
441:             * we throw <code>VoteException</code> to be ignored by the
442:             * <code>VoteChannel</code>.
443:             * 
444:             * @return <code>true</code> when commiting the lock operation succeeds.
445:             * 
446:             * @throws VoteException if we should be ignored during voting.
447:             */
448:            public synchronized boolean commit(Object decree)
449:                    throws VoteException {
450:                if (!(decree instanceof  LockDecree))
451:                    throw new VoteException("Uknown decree type. Ignore me.");
452:
453:                if (decree instanceof  AcquireLockDecree) {
454:
455:                    if (log.isDebugEnabled())
456:                        log.debug("Committing decree acquisition "
457:                                + ((LockDecree) decree).lockId);
458:
459:                    if (!checkPrepared(preparedLocks, (LockDecree) decree))
460:                        // there is a prepared lock owned by third party
461:                        return false;
462:
463:                    if (localLock((LockDecree) decree)) {
464:                        preparedLocks.remove(((LockDecree) decree).getKey());
465:                        return true;
466:                    } else
467:                        return false;
468:                } else if (decree instanceof  ReleaseLockDecree) {
469:
470:                    if (log.isDebugEnabled())
471:                        log.debug("Committing decree release "
472:                                + ((LockDecree) decree).lockId);
473:
474:                    if (!checkPrepared(preparedReleases, (LockDecree) decree))
475:                        // there is a prepared release owned by third party
476:                        return false;
477:
478:                    if (localRelease((LockDecree) decree)) {
479:                        preparedReleases.remove(((LockDecree) decree).getKey());
480:                        return true;
481:                    } else
482:                        return false;
483:                } else if (decree instanceof  MultiLockDecree) {
484:                    return true;
485:                }
486:
487:                // we should not be here
488:                return false;
489:            }
490:
491:            /**
492:             * Abort phase for the lock acquisition or release.
493:             * 
494:             * @param decree should be an instance <code>LockDecree</code>, if not,
495:             * we throw <code>VoteException</code> to be ignored by the
496:             * <code>VoteChannel</code>.
497:             * 
498:             * @throws VoteException if we should be ignored during voting.
499:             */
500:            public synchronized void abort(Object decree) throws VoteException {
501:                if (!(decree instanceof  LockDecree))
502:                    throw new VoteException("Uknown decree type. Ignore me.");
503:
504:                if (decree instanceof  AcquireLockDecree) {
505:
506:                    if (log.isDebugEnabled())
507:                        log.debug("Aborting decree acquisition "
508:                                + ((LockDecree) decree).lockId);
509:
510:                    if (!checkPrepared(preparedLocks, (LockDecree) decree))
511:                        // there is a prepared lock owned by third party
512:                        return;
513:
514:                    preparedLocks.remove(((LockDecree) decree).getKey());
515:                } else if (decree instanceof  ReleaseLockDecree) {
516:
517:                    if (log.isDebugEnabled())
518:                        log.debug("Aborting decree release "
519:                                + ((LockDecree) decree).lockId);
520:
521:                    if (!checkPrepared(preparedReleases, (LockDecree) decree))
522:                        // there is a prepared release owned by third party
523:                        return;
524:
525:                    preparedReleases.remove(((LockDecree) decree).getKey());
526:                }
527:
528:            }
529:
530:            /**
531:             * Processes the response list and votes like the default processResponses method with the consensusType VOTE_ALL
532:             * If the result of the voting is false, but this DistributedLockManager owns the lock, the result is changed to
533:             * true and the lock is released, but marked as multiple locked. (only in the prepare state to reduce traffic)
534:             * <p>
535:             * Note: we do not support voting in case of Byzantine failures, i.e.
536:             * when the node responds with the fault message.
537:             */
538:            public boolean processResponses(RspList responses,
539:                    int consensusType, Object decree) throws ChannelException {
540:                if (responses == null) {
541:                    return false;
542:                }
543:
544:                int totalPositiveVotes = 0;
545:                int totalNegativeVotes = 0;
546:
547:                for (int i = 0; i < responses.size(); i++) {
548:                    Rsp response = (Rsp) responses.elementAt(i);
549:
550:                    switch (checkResponse(response)) {
551:                    case PROCESS_SKIP:
552:                        continue;
553:                    case PROCESS_BREAK:
554:                        return false;
555:                    }
556:
557:                    VoteResult result = (VoteResult) response.getValue();
558:
559:                    totalPositiveVotes += result.getPositiveVotes();
560:                    totalNegativeVotes += result.getNegativeVotes();
561:                }
562:
563:                boolean voteResult = (totalNegativeVotes == 0 && totalPositiveVotes > 0);
564:
565:                if (decree instanceof  TwoPhaseVotingAdapter.TwoPhaseWrapper) {
566:                    TwoPhaseVotingAdapter.TwoPhaseWrapper wrappedDecree = (TwoPhaseVotingAdapter.TwoPhaseWrapper) decree;
567:                    if (wrappedDecree.isPrepare()) {
568:                        Object unwrappedDecree = wrappedDecree.getDecree();
569:                        if (unwrappedDecree instanceof  ReleaseLockDecree) {
570:                            ReleaseLockDecree releaseLockDecree = (ReleaseLockDecree) unwrappedDecree;
571:                            LockDecree lock = null;
572:                            if ((lock = (LockDecree) heldLocks
573:                                    .get(releaseLockDecree.getKey())) != null) {
574:                                // If there is a local lock...
575:                                if (!voteResult) {
576:                                    // ... and another DLM voted negatively, but this DLM owns the lock
577:                                    // we inform the other node, that it's lock is multiple locked
578:                                    if (informLockingNodes(releaseLockDecree)) {
579:
580:                                        // we set the local lock to multiple locked
581:                                        lock.setMultipleLocked(true);
582:
583:                                        voteResult = true;
584:                                    }
585:                                }
586:                                if (lock.isMultipleLocked()) {
587:                                    //... and the local lock is marked as multilocked
588:                                    // we mark the releaseLockDecree als multiple locked for evaluation when unlock returns
589:                                    releaseLockDecree.setMultipleLocked(true);
590:                                }
591:                            }
592:                        }
593:                    }
594:                }
595:
596:                return voteResult;
597:            }
598:
599:            /**
600:             * This method checks the response and says the processResponses() method
601:             * what to do.
602:             * @return PROCESS_CONTINUE to continue calculating votes,
603:             * PROCESS_BREAK to stop calculating votes from the nodes,
604:             * PROCESS_SKIP to skip current response.
605:             * @throws ChannelException when the response is fatal to the
606:             * current voting process.
607:             */
608:            private int checkResponse(Rsp response) throws ChannelException {
609:
610:                if (!response.wasReceived()) {
611:
612:                    if (log.isDebugEnabled())
613:                        log.debug("Response from node " + response.getSender()
614:                                + " was not received.");
615:
616:                    throw new ChannelException("Node " + response.getSender()
617:                            + " failed to respond.");
618:                }
619:
620:                if (response.wasSuspected()) {
621:
622:                    if (log.isDebugEnabled())
623:                        log.debug("Node " + response.getSender()
624:                                + " was suspected.");
625:
626:                    return PROCESS_SKIP;
627:                }
628:
629:                Object object = response.getValue();
630:
631:                // we received exception/error, something went wrong
632:                // on one of the nodes... and we do not handle such faults
633:                if (object instanceof  Throwable) {
634:                    throw new ChannelException("Node " + response.getSender()
635:                            + " is faulty.");
636:                }
637:
638:                if (object == null) {
639:                    return PROCESS_SKIP;
640:                }
641:
642:                // it is always interesting to know the class that caused failure...
643:                if (!(object instanceof  VoteResult)) {
644:                    String faultClass = object.getClass().getName();
645:
646:                    // ...but we do not handle byzantine faults
647:                    throw new ChannelException("Node " + response.getSender()
648:                            + " generated fault (class " + faultClass + ')');
649:                }
650:
651:                // what if we received the response from faulty node?
652:                if (object instanceof  FailureVoteResult) {
653:
654:                    if (log.isErrorEnabled())
655:                        log.error(((FailureVoteResult) object).getReason());
656:
657:                    return PROCESS_BREAK;
658:                }
659:
660:                // everything is fine :)
661:                return PROCESS_CONTINUE;
662:            }
663:
664:            private boolean informLockingNodes(
665:                    ReleaseLockDecree releaseLockDecree)
666:                    throws ChannelException {
667:                return votingAdapter.vote(
668:                        new MultiLockDecree(releaseLockDecree), VOTE_TIMEOUT);
669:            }
670:
671:            /** Remove all locks held by members who left the previous view */
672:            public void viewAccepted(View new_view) {
673:                Vector prev_view = new Vector(current_members);
674:                current_members.clear();
675:                current_members.addAll(new_view.getMembers());
676:
677:                System.out.println("-- VIEW: " + current_members
678:                        + ", old view: " + prev_view);
679:
680:                prev_view.removeAll(current_members);
681:                if (prev_view.size() > 0) { // we have left members, so we need to check for locks which are still held by them
682:                    for (Iterator it = prev_view.iterator(); it.hasNext();) {
683:                        Object mbr = it.next();
684:                        removeLocksHeldBy(preparedLocks, mbr);
685:                        removeLocksHeldBy(preparedReleases, mbr);
686:                        removeLocksHeldBy(heldLocks, mbr);
687:                    }
688:                }
689:            }
690:
691:            /** Remove from preparedLocks, preparedReleases and heldLocks */
692:            private void removeLocksHeldBy(Map lock_table, Object mbr) {
693:                Map.Entry entry;
694:                LockDecree val;
695:                Object holder;
696:                for (Iterator it = lock_table.entrySet().iterator(); it
697:                        .hasNext();) {
698:                    entry = (Map.Entry) it.next();
699:                    val = (LockDecree) entry.getValue();
700:                    holder = val.requester;
701:                    if (holder != null && holder.equals(mbr)) {
702:                        if (log.isTraceEnabled())
703:                            log.trace("removing a leftover lock held by " + mbr
704:                                    + " for " + entry.getKey() + ": " + val);
705:                        it.remove();
706:                    }
707:                }
708:            }
709:
710:            public void suspect(Address suspected_mbr) {
711:            }
712:
713:            public void block() {
714:            }
715:
716:            /**
717:             * This class represents the lock
718:             */
719:            public static class LockDecree implements  Serializable {
720:
721:                protected final Object lockId;
722:                protected final Object requester;
723:                protected final Object managerId;
724:
725:                protected boolean commited;
726:
727:                private boolean multipleLocked = false;
728:                private static final long serialVersionUID = 7264104838035219212L;
729:
730:                private LockDecree(Object lockId, Object requester,
731:                        Object managerId) {
732:                    this .lockId = lockId;
733:                    this .requester = requester;
734:                    this .managerId = managerId;
735:                }
736:
737:                /**
738:                 * Returns the key that should be used for Map lookup.
739:                 */
740:                public Object getKey() {
741:                    return lockId;
742:                }
743:
744:                /**
745:                 * This is a place-holder for future lock expiration code.
746:                 */
747:                public boolean isValid() {
748:                    return true;
749:                }
750:
751:                public void commit() {
752:                    this .commited = true;
753:                }
754:
755:                /**
756:                 * @return Returns the multipleLocked.
757:                 */
758:                public boolean isMultipleLocked() {
759:                    return multipleLocked;
760:                }
761:
762:                /**
763:                 * @param multipleLocked The multipleLocked to set.
764:                 */
765:                public void setMultipleLocked(boolean multipleLocked) {
766:                    this .multipleLocked = multipleLocked;
767:                }
768:
769:                /**
770:                 * This is hashcode from the java.lang.Long class.
771:                 */
772:                public int hashCode() {
773:                    return lockId.hashCode();
774:                }
775:
776:                public boolean equals(Object other) {
777:
778:                    if (other instanceof  LockDecree) {
779:                        return ((LockDecree) other).lockId.equals(this .lockId);
780:                    } else {
781:                        return false;
782:                    }
783:                }
784:            }
785:
786:            /**
787:             * This class represents the lock to be released.
788:             */
789:            public static class AcquireLockDecree extends LockDecree {
790:                private final long creationTime;
791:
792:                private AcquireLockDecree(Object lockId, Object requester,
793:                        Object managerId) {
794:                    super (lockId, requester, managerId);
795:                    this .creationTime = System.currentTimeMillis();
796:                }
797:
798:                /**
799:                 * Lock aquire decree is valid for a <code>ACQUIRE_EXPIRATION</code>
800:                 * time after creation and if the lock is still valid (in the
801:                 * future locks will be leased for a predefined period of time).
802:                 */
803:                public boolean isValid() {
804:                    boolean result = super .isValid();
805:
806:                    if (!commited && result)
807:                        result = ((creationTime + ACQUIRE_EXPIRATION) > System
808:                                .currentTimeMillis());
809:
810:                    return result;
811:                }
812:
813:            }
814:
815:            /**
816:             * This class represents the lock to be released.
817:             */
818:            public static class ReleaseLockDecree extends LockDecree {
819:                ReleaseLockDecree(Object lockId, Object requester,
820:                        Object managerId) {
821:                    super (lockId, requester, managerId);
822:                }
823:            }
824:
825:            /**
826:             * This class represents the lock that has to be marked as multilocked 
827:             */
828:            public static class MultiLockDecree extends LockDecree {
829:                MultiLockDecree(Object lockId, Object requester,
830:                        Object managerId) {
831:                    super (lockId, requester, managerId);
832:                }
833:
834:                MultiLockDecree(ReleaseLockDecree releaseLockDecree) {
835:                    super(releaseLockDecree.lockId,
836:                            releaseLockDecree.requester,
837:                            releaseLockDecree.managerId);
838:                }
839:            }
840:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.