Source Code Cross Referenced for DistributedQueue.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: DistributedQueue.java,v 1.19 2006/07/31 09:21:58 belaban Exp $
002:        package org.jgroups.blocks;
003:
004:        import org.apache.commons.logging.Log;
005:        import org.apache.commons.logging.LogFactory;
006:        import org.jgroups.*;
007:        import org.jgroups.util.RspList;
008:        import org.jgroups.util.Util;
009:
010:        import java.io.Serializable;
011:        import java.util.*;
012:
013:        /**
014:         * Provides the abstraction of a java.util.LinkedList that is replicated at several
015:         * locations. Any change to the list (reset, add, remove, etc.) will transparently be
016:         * propagated to all replicas in the group. All read-only methods will always access the
017:         * local replica.<p>
018:         * Both keys and values added to the list <em>must be serializable</em>, the reason
019:         * being that they will be sent across the network to all replicas of the group.
020:         * An instance of this class will contact an existing member of the group to fetch its
021:         * initial state.
022:         * Beware to use a <em>total protocol</em> on initialization or elements would not be in same
023:         * order on all replicas.
024:         * @author Romuald du Song
025:         */
026:        public class DistributedQueue implements  MessageListener,
027:                MembershipListener, Cloneable {
028:            public interface Notification {
029:                void entryAdd(Object value);
030:
031:                void entryRemoved(Object key);
032:
033:                void viewChange(Vector new_mbrs, Vector old_mbrs);
034:
035:                void contentsCleared();
036:
037:                void contentsSet(Collection new_entries);
038:            }
039:
040:            protected Log logger = LogFactory.getLog(getClass());
041:            private long internal_timeout = 10000; // 10 seconds to wait for a response
042:
043:            /*lock object for synchronization*/
044:            protected final Object mutex = new Object();
045:            protected boolean stopped = false; // whether to we are stopped !
046:            protected LinkedList internalQueue;
047:            protected Channel channel;
048:            protected RpcDispatcher disp = null;
049:            protected String groupname = null;
050:            protected Vector notifs = new Vector(); // to be notified when mbrship changes
051:            protected Vector members = new Vector(); // keeps track of all DHTs
052:            private Class[] add_signature = null;
053:            private Class[] addAtHead_signature = null;
054:            private Class[] addAll_signature = null;
055:            private Class[] reset_signature = null;
056:            private Class[] remove_signature = null;
057:
058:            /**
059:             * Creates a DistributedQueue
060:             * @param groupname The name of the group to join
061:             * @param factory The ChannelFactory which will be used to create a channel
062:             * @param properties The property string to be used to define the channel
063:             * @param state_timeout The time to wait until state is retrieved in milliseconds. A value of 0 means wait forever.
064:             */
065:            public DistributedQueue(String groupname, ChannelFactory factory,
066:                    String properties, long state_timeout)
067:                    throws ChannelException {
068:                if (logger.isDebugEnabled()) {
069:                    logger.debug("DistributedQueue(" + groupname + ','
070:                            + properties + ',' + state_timeout);
071:                }
072:
073:                this .groupname = groupname;
074:                initSignatures();
075:                internalQueue = new LinkedList();
076:                channel = (factory != null) ? factory.createChannel(properties)
077:                        : new JChannel(properties);
078:                disp = new RpcDispatcher(channel, this , this , this );
079:                disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
080:                channel.connect(groupname);
081:                start(state_timeout);
082:            }
083:
084:            public DistributedQueue(JChannel channel) {
085:                this .groupname = channel.getClusterName();
086:                this .channel = channel;
087:                init();
088:            }
089:
090:            /**
091:             * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be
092:             * used to register under that id. This is typically used when another building block is already using
093:             * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
094:             * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
095:             * first block created on PullPushAdapter.
096:             * The caller needs to call start(), before using the this block. It gives the opportunity for the caller
097:             * to register as a lessoner for Notifications events.
098:             * @param adapter The PullPushAdapter which to use as underlying transport
099:             * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
100:             *           requests/responses for different building blocks on top of PullPushAdapter.
101:             */
102:            public DistributedQueue(PullPushAdapter adapter, Serializable id) {
103:                this .channel = (Channel) adapter.getTransport();
104:                this .groupname = this .channel.getClusterName();
105:
106:                initSignatures();
107:                internalQueue = new LinkedList();
108:
109:                disp = new RpcDispatcher(adapter, id, this , this , this );
110:                disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
111:            }
112:
113:            protected final void init() {
114:                initSignatures();
115:                internalQueue = new LinkedList();
116:                disp = new RpcDispatcher(channel, this , this , this );
117:                disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
118:            }
119:
120:            public final void start(long state_timeout)
121:                    throws ChannelClosedException, ChannelNotConnectedException {
122:                boolean rc;
123:                logger.debug("DistributedQueue.initState(" + groupname
124:                        + "): starting state retrieval");
125:
126:                rc = channel.getState(null, state_timeout);
127:
128:                if (rc) {
129:                    logger.info("DistributedQueue.initState(" + groupname
130:                            + "): state was retrieved successfully");
131:                } else {
132:                    logger.info("DistributedQueue.initState(" + groupname
133:                            + "): state could not be retrieved (first member)");
134:                }
135:            }
136:
137:            public Address getLocalAddress() {
138:                return (channel != null) ? channel.getLocalAddress() : null;
139:            }
140:
141:            public Channel getChannel() {
142:                return channel;
143:            }
144:
145:            public void addNotifier(Notification n) {
146:                if (n != null && !notifs.contains(n)) {
147:                    notifs.addElement(n);
148:                }
149:            }
150:
151:            public void removeNotifier(Notification n) {
152:                notifs.removeElement(n);
153:            }
154:
155:            public void stop() {
156:                /*lock the queue from other threads*/
157:                synchronized (mutex) {
158:                    internalQueue.clear();
159:
160:                    if (disp != null) {
161:                        disp.stop();
162:                        disp = null;
163:                    }
164:
165:                    if (channel != null) {
166:                        channel.close();
167:                        channel = null;
168:                    }
169:
170:                    stopped = true;
171:                }
172:            }
173:
174:            /**
175:             * Add the speficied element at the bottom of the queue
176:             * @param value
177:             */
178:            public void add(Object value) {
179:                try {
180:                    Object retval = null;
181:
182:                    RspList rsp = disp.callRemoteMethods(null, "_add",
183:                            new Object[] { value }, add_signature,
184:                            GroupRequest.GET_ALL, 0);
185:                    Vector results = rsp.getResults();
186:
187:                    if (results.size() > 0) {
188:                        retval = results.elementAt(0);
189:
190:                        if (logger.isDebugEnabled()) {
191:                            checkResult(rsp, retval);
192:                        }
193:                    }
194:                } catch (Exception e) {
195:                    logger.error("Unable to add value " + value, e);
196:                }
197:
198:            }
199:
200:            /**
201:             * Add the speficied element at the top of the queue
202:             * @param value
203:             */
204:            public void addAtHead(Object value) {
205:                try {
206:                    disp.callRemoteMethods(null, "_addAtHead",
207:                            new Object[] { value }, addAtHead_signature,
208:                            GroupRequest.GET_ALL, 0);
209:                } catch (Exception e) {
210:                    logger.error("Unable to addAtHead value " + value, e);
211:                }
212:
213:            }
214:
215:            /**
216:             * Add the speficied collection to the top of the queue.
217:             * Elements are added in the order that they are returned by the specified
218:             * collection's iterator.
219:             * @param values
220:             */
221:            public void addAll(Collection values) {
222:                try {
223:                    disp.callRemoteMethods(null, "_addAll",
224:                            new Object[] { values }, addAll_signature,
225:                            GroupRequest.GET_ALL, 0);
226:                } catch (Exception e) {
227:                    logger.error("Unable to addAll value: " + values, e);
228:                }
229:
230:            }
231:
232:            public Vector getContents() {
233:                Vector result = new Vector();
234:
235:                for (Iterator e = internalQueue.iterator(); e.hasNext();)
236:                    result.add(e.next());
237:
238:                return result;
239:            }
240:
241:            public int size() {
242:                return internalQueue.size();
243:            }
244:
245:            /**
246:             * returns the first object on the queue, without removing it.
247:             * If the queue is empty this object blocks until the first queue object has
248:             * been added
249:             * @return the first object on the queue
250:             */
251:            public Object peek() {
252:                Object retval = null;
253:
254:                try {
255:                    retval = internalQueue.getFirst();
256:                } catch (NoSuchElementException e) {
257:                }
258:
259:                return retval;
260:            }
261:
262:            public void reset() {
263:                try {
264:                    disp.callRemoteMethods(null, "_reset", null,
265:                            reset_signature, GroupRequest.GET_ALL, 0);
266:                } catch (Exception e) {
267:                    logger
268:                            .error("DistributedQueue.reset(" + groupname + ')',
269:                                    e);
270:                }
271:            }
272:
273:            protected void checkResult(RspList rsp, Object retval) {
274:                if (logger.isDebugEnabled()) {
275:                    logger.debug("Value updated from " + groupname + " :"
276:                            + retval);
277:                }
278:
279:                Vector results = rsp.getResults();
280:
281:                for (int i = 0; i < results.size(); i++) {
282:                    Object data = results.elementAt(i);
283:
284:                    if (!data.equals(retval)) {
285:                        logger
286:                                .error("Reference value differs from returned value "
287:                                        + retval + " != " + data);
288:                    }
289:                }
290:            }
291:
292:            /**
293:             * Try to return the first objet in the queue.It does not wait for an object.
294:             * @return the first object in the queue or null if none were found.
295:             */
296:            public Object remove() {
297:                Object retval = null;
298:                RspList rsp = disp.callRemoteMethods(null, "_remove", null,
299:                        remove_signature, GroupRequest.GET_ALL,
300:                        internal_timeout);
301:                Vector results = rsp.getResults();
302:
303:                if (results.size() > 0) {
304:                    retval = results.elementAt(0);
305:
306:                    if (logger.isDebugEnabled()) {
307:                        checkResult(rsp, retval);
308:                    }
309:                }
310:
311:                return retval;
312:            }
313:
314:            /**
315:             * @param timeout The time to wait until an entry is retrieved in milliseconds. A value of 0 means wait forever.
316:             * @return the first object in the queue or null if none were found
317:             */
318:            public Object remove(long timeout) {
319:                Object retval = null;
320:                long start = System.currentTimeMillis();
321:
322:                if (timeout <= 0) {
323:                    while (!stopped && (retval == null)) {
324:                        RspList rsp = disp.callRemoteMethods(null, "_remove",
325:                                null, remove_signature, GroupRequest.GET_ALL,
326:                                internal_timeout);
327:                        Vector results = rsp.getResults();
328:
329:                        if (results.size() > 0) {
330:                            retval = results.elementAt(0);
331:
332:                            if (logger.isDebugEnabled()) {
333:                                checkResult(rsp, retval);
334:                            }
335:                        }
336:
337:                        if (retval == null) {
338:                            try {
339:                                synchronized (mutex) {
340:                                    mutex.wait();
341:                                }
342:                            } catch (InterruptedException e) {
343:                            }
344:                        }
345:                    }
346:                } else {
347:                    while (((System.currentTimeMillis() - start) < timeout)
348:                            && !stopped && (retval == null)) {
349:                        RspList rsp = disp.callRemoteMethods(null, "_remove",
350:                                null, remove_signature, GroupRequest.GET_ALL,
351:                                internal_timeout);
352:                        Vector results = rsp.getResults();
353:
354:                        if (results.size() > 0) {
355:                            retval = results.elementAt(0);
356:
357:                            if (logger.isDebugEnabled()) {
358:                                checkResult(rsp, retval);
359:                            }
360:                        }
361:
362:                        if (retval == null) {
363:                            try {
364:                                long delay = timeout
365:                                        - (System.currentTimeMillis() - start);
366:
367:                                synchronized (mutex) {
368:                                    if (delay > 0) {
369:                                        mutex.wait(delay);
370:                                    }
371:                                }
372:                            } catch (InterruptedException e) {
373:                            }
374:                        }
375:                    }
376:                }
377:
378:                return retval;
379:            }
380:
381:            public String toString() {
382:                return internalQueue.toString();
383:            }
384:
385:            /*------------------------ Callbacks -----------------------*/
386:            public void _add(Object value) {
387:                if (logger.isDebugEnabled()) {
388:                    logger.debug(groupname + '@' + getLocalAddress() + " _add("
389:                            + value + ')');
390:                }
391:
392:                /*lock the queue from other threads*/
393:                synchronized (mutex) {
394:                    internalQueue.add(value);
395:
396:                    /*wake up all the threads that are waiting for the lock to be released*/
397:                    mutex.notifyAll();
398:                }
399:
400:                for (int i = 0; i < notifs.size(); i++)
401:                    ((Notification) notifs.elementAt(i)).entryAdd(value);
402:            }
403:
404:            public void _addAtHead(Object value) {
405:                /*lock the queue from other threads*/
406:                synchronized (mutex) {
407:                    internalQueue.addFirst(value);
408:
409:                    /*wake up all the threads that are waiting for the lock to be released*/
410:                    mutex.notifyAll();
411:                }
412:
413:                for (int i = 0; i < notifs.size(); i++)
414:                    ((Notification) notifs.elementAt(i)).entryAdd(value);
415:            }
416:
417:            public void _reset() {
418:                if (logger.isDebugEnabled()) {
419:                    logger.debug(groupname + '@' + getLocalAddress()
420:                            + " _reset()");
421:                }
422:
423:                _private_reset();
424:
425:                for (int i = 0; i < notifs.size(); i++)
426:                    ((Notification) notifs.elementAt(i)).contentsCleared();
427:            }
428:
429:            protected void _private_reset() {
430:                /*lock the queue from other threads*/
431:                synchronized (mutex) {
432:                    internalQueue.clear();
433:
434:                    /*wake up all the threads that are waiting for the lock to be released*/
435:                    mutex.notifyAll();
436:                }
437:            }
438:
439:            public Object _remove() {
440:                Object retval = null;
441:
442:                try {
443:                    /*lock the queue from other threads*/
444:                    synchronized (mutex) {
445:                        retval = internalQueue.removeFirst();
446:
447:                        /*wake up all the threads that are waiting for the lock to be released*/
448:                        mutex.notifyAll();
449:                    }
450:
451:                    if (logger.isDebugEnabled()) {
452:                        logger.debug(groupname + '@' + getLocalAddress()
453:                                + "_remove(" + retval + ')');
454:                    }
455:
456:                    for (int i = 0; i < notifs.size(); i++)
457:                        ((Notification) notifs.elementAt(i))
458:                                .entryRemoved(retval);
459:                } catch (NoSuchElementException e) {
460:                    logger.debug(groupname + '@' + getLocalAddress()
461:                            + "_remove(): nothing to remove");
462:                }
463:
464:                return retval;
465:            }
466:
467:            public void _addAll(Collection c) {
468:                if (logger.isDebugEnabled()) {
469:                    logger.debug(groupname + '@' + getLocalAddress()
470:                            + " _addAll(" + c + ')');
471:                }
472:
473:                /*lock the queue from other threads*/
474:                synchronized (mutex) {
475:                    internalQueue.addAll(c);
476:
477:                    /*wake up all the threads that are waiting for the lock to be released*/
478:                    mutex.notifyAll();
479:                }
480:
481:                for (int i = 0; i < notifs.size(); i++)
482:                    ((Notification) notifs.elementAt(i)).contentsSet(c);
483:            }
484:
485:            /*----------------------------------------------------------*/
486:            /*-------------------- State Exchange ----------------------*/
487:            public void receive(Message msg) {
488:            }
489:
490:            public byte[] getState() {
491:                Vector copy = (Vector) getContents().clone();
492:
493:                try {
494:                    return Util.objectToByteBuffer(copy);
495:                } catch (Throwable ex) {
496:                    logger
497:                            .error(
498:                                    "DistributedQueue.getState(): exception marshalling state.",
499:                                    ex);
500:
501:                    return null;
502:                }
503:            }
504:
505:            public void setState(byte[] new_state) {
506:                Vector new_copy;
507:
508:                try {
509:                    new_copy = (Vector) Util.objectFromByteBuffer(new_state);
510:
511:                    if (new_copy == null) {
512:                        return;
513:                    }
514:                } catch (Throwable ex) {
515:                    logger
516:                            .error(
517:                                    "DistributedQueue.setState(): exception unmarshalling state.",
518:                                    ex);
519:
520:                    return;
521:                }
522:
523:                _private_reset(); // remove all elements      
524:                _addAll(new_copy);
525:            }
526:
527:            /*------------------- Membership Changes ----------------------*/
528:            public void viewAccepted(View new_view) {
529:                Vector new_mbrs = new_view.getMembers();
530:
531:                if (new_mbrs != null) {
532:                    sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left)
533:                    members.removeAllElements();
534:
535:                    for (int i = 0; i < new_mbrs.size(); i++)
536:                        members.addElement(new_mbrs.elementAt(i));
537:                }
538:            }
539:
540:            /** Called when a member is suspected */
541:            public void suspect(Address suspected_mbr) {
542:                ;
543:            }
544:
545:            /** Block sending and receiving of messages until ViewAccepted is called */
546:            public void block() {
547:            }
548:
549:            void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) {
550:                Vector joined;
551:                Vector left;
552:                Object mbr;
553:                Notification n;
554:
555:                if ((notifs.size() == 0) || (old_mbrs == null)
556:                        || (new_mbrs == null) || (old_mbrs.size() == 0)
557:                        || (new_mbrs.size() == 0)) {
558:                    return;
559:                }
560:
561:                // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
562:                joined = new Vector();
563:
564:                for (int i = 0; i < new_mbrs.size(); i++) {
565:                    mbr = new_mbrs.elementAt(i);
566:
567:                    if (!old_mbrs.contains(mbr)) {
568:                        joined.addElement(mbr);
569:                    }
570:                }
571:
572:                // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
573:                left = new Vector();
574:
575:                for (int i = 0; i < old_mbrs.size(); i++) {
576:                    mbr = old_mbrs.elementAt(i);
577:
578:                    if (!new_mbrs.contains(mbr)) {
579:                        left.addElement(mbr);
580:                    }
581:                }
582:
583:                for (int i = 0; i < notifs.size(); i++) {
584:                    n = (Notification) notifs.elementAt(i);
585:                    n.viewChange(joined, left);
586:                }
587:            }
588:
589:            final void initSignatures() {
590:                try {
591:                    if (add_signature == null) {
592:                        add_signature = new Class[] { Object.class };
593:                    }
594:
595:                    if (addAtHead_signature == null) {
596:                        addAtHead_signature = new Class[] { Object.class };
597:                    }
598:
599:                    if (addAll_signature == null) {
600:                        addAll_signature = new Class[] { Collection.class };
601:                    }
602:
603:                    if (reset_signature == null) {
604:                        reset_signature = new Class[0];
605:                    }
606:
607:                    if (remove_signature == null) {
608:                        remove_signature = new Class[0];
609:                    }
610:                } catch (Throwable ex) {
611:                    logger.error("DistributedQueue.initMethods()", ex);
612:                }
613:            }
614:
615:            public static void main(String[] args) {
616:                try {
617:                    // The setup here is kind of weird:
618:                    // 1. Create a channel
619:                    // 2. Create a DistributedQueue (on the channel)
620:                    // 3. Connect the channel (so the HT gets a VIEW_CHANGE)
621:                    // 4. Start the HT
622:                    //
623:                    // A simpler setup is
624:                    // DistributedQueue ht = new DistributedQueue("demo", null, 
625:                    //         "file://c:/JGroups-2.0/conf/total-token.xml", 5000);
626:                    JChannel c = new JChannel(
627:                            "file:/c:/JGroups-2.0/conf/conf/total-token.xml");
628:
629:                    DistributedQueue ht = new DistributedQueue(c);
630:                    c.connect("demo");
631:                    ht.start(5000);
632:
633:                    ht.add("name");
634:                    ht.add("Michelle Ban");
635:
636:                    Object old_key = ht.remove();
637:                    System.out.println("old key was " + old_key);
638:                    old_key = ht.remove();
639:                    System.out.println("old value was " + old_key);
640:
641:                    ht.add("name 'Michelle Ban'");
642:
643:                    System.out.println("queue is " + ht);
644:                } catch (Throwable t) {
645:                    t.printStackTrace();
646:                }
647:            }
648:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.