Source Code Cross Referenced for ReplicatedTree.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        // $Id: ReplicatedTree.java,v 1.15 2006/07/31 09:21:58 belaban Exp $
0002:
0003:        package org.jgroups.blocks;
0004:
0005:        import org.apache.commons.logging.Log;
0006:        import org.apache.commons.logging.LogFactory;
0007:        import org.jgroups.*;
0008:        import org.jgroups.jmx.JmxConfigurator;
0009:        import org.jgroups.util.Queue;
0010:        import org.jgroups.util.QueueClosedException;
0011:        import org.jgroups.util.Util;
0012:
0013:        import javax.management.MBeanServer;
0014:        import java.io.Serializable;
0015:        import java.util.*;
0016:
0017:        /**
0018:         * A tree-like structure that is replicated across several members. Updates will be multicast to all group
0019:         * members reliably and in the same order.
0020:         * @author Bela Ban Jan 17 2002
0021:         * @author <a href="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a>
0022:         */
0023:        public class ReplicatedTree implements  Runnable, MessageListener,
0024:                MembershipListener {
0025:            public static final String SEPARATOR = "/";
0026:            final static int INDENT = 4;
0027:            Node root = new Node(SEPARATOR, SEPARATOR, null, null);
0028:            final Vector listeners = new Vector();
0029:            final Queue request_queue = new Queue();
0030:            Thread request_handler = null;
0031:            JChannel channel = null;
0032:            PullPushAdapter adapter = null;
0033:            String groupname = "ReplicatedTree-Group";
0034:            final Vector members = new Vector();
0035:            long state_fetch_timeout = 10000;
0036:            boolean jmx = false;
0037:
0038:            protected final Log log = LogFactory.getLog(this .getClass());
0039:
0040:            /** Whether or not to use remote calls. If false, all methods will be invoked directly on this
0041:             instance rather than sending a message to all replicas and only then invoking the method.
0042:             Useful for testing */
0043:            boolean remote_calls = true;
0044:            String props = "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;"
0045:                    + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
0046:                    + "PING(timeout=2000;num_initial_members=3):"
0047:                    + "MERGE2(min_interval=5000;max_interval=10000):"
0048:                    + "FD_SOCK:"
0049:                    + "VERIFY_SUSPECT(timeout=1500):"
0050:                    + "pbcast.STABLE(desired_avg_gossip=20000):"
0051:                    + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):"
0052:                    + "UNICAST(timeout=5000):"
0053:                    + "FRAG(frag_size=16000;down_thread=false;up_thread=false):"
0054:                    + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
0055:                    + "shun=false;print_local_addr=true):"
0056:                    + "pbcast.STATE_TRANSFER";
0057:            // "PERF(details=true)";
0058:
0059:            /** Determines when the updates have to be sent across the network, avoids sending unnecessary
0060:             * messages when there are no member in the group */
0061:            private boolean send_message = false;
0062:
0063:            public interface ReplicatedTreeListener {
0064:                void nodeAdded(String fqn);
0065:
0066:                void nodeRemoved(String fqn);
0067:
0068:                void nodeModified(String fqn);
0069:
0070:                void viewChange(View new_view); // might be MergeView after merging
0071:            }
0072:
0073:            /**
0074:             * Creates a channel with the given properties. Connects to the channel, then creates a PullPushAdapter
0075:             * and starts it
0076:             */
0077:            public ReplicatedTree(String groupname, String props,
0078:                    long state_fetch_timeout) throws Exception {
0079:                if (groupname != null)
0080:                    this .groupname = groupname;
0081:                if (props != null)
0082:                    this .props = props;
0083:                this .state_fetch_timeout = state_fetch_timeout;
0084:                channel = new JChannel(this .props);
0085:                channel.connect(this .groupname);
0086:                start();
0087:            }
0088:
0089:            public ReplicatedTree(String groupname, String props,
0090:                    long state_fetch_timeout, boolean jmx) throws Exception {
0091:                if (groupname != null)
0092:                    this .groupname = groupname;
0093:                if (props != null)
0094:                    this .props = props;
0095:                this .jmx = jmx;
0096:                this .state_fetch_timeout = state_fetch_timeout;
0097:                channel = new JChannel(this .props);
0098:                channel.connect(this .groupname);
0099:                if (jmx) {
0100:                    MBeanServer server = Util.getMBeanServer();
0101:                    if (server == null)
0102:                        throw new Exception(
0103:                                "No MBeanServers found; need to run with an MBeanServer present, or inside JDK 5");
0104:                    JmxConfigurator.registerChannel(channel, server, "jgroups",
0105:                            channel.getClusterName(), true);
0106:                }
0107:                start();
0108:            }
0109:
0110:            public ReplicatedTree() {
0111:            }
0112:
0113:            /**
0114:             * Expects an already connected channel. Creates a PullPushAdapter and starts it
0115:             */
0116:            public ReplicatedTree(JChannel channel) throws Exception {
0117:                this .channel = channel;
0118:                start();
0119:            }
0120:
0121:            public void setRemoteCalls(boolean flag) {
0122:                remote_calls = flag;
0123:            }
0124:
0125:            public void setRootNode(Node n) {
0126:                root = n;
0127:            }
0128:
0129:            public Address getLocalAddress() {
0130:                return channel != null ? channel.getLocalAddress() : null;
0131:            }
0132:
0133:            public Vector getMembers() {
0134:                return members;
0135:            }
0136:
0137:            /**
0138:             * Fetch the group state from the current coordinator. If successful, this will trigger setState().
0139:             */
0140:            public void fetchState(long timeout) throws ChannelClosedException,
0141:                    ChannelNotConnectedException {
0142:                boolean rc = channel.getState(null, timeout);
0143:                if (log.isInfoEnabled()) {
0144:                    if (rc)
0145:                        log.info("state was retrieved successfully");
0146:                    else
0147:                        log.info("state could not be retrieved (first member)");
0148:                }
0149:            }
0150:
0151:            public void addReplicatedTreeListener(
0152:                    ReplicatedTreeListener listener) {
0153:                if (!listeners.contains(listener))
0154:                    listeners.addElement(listener);
0155:            }
0156:
0157:            public void removeReplicatedTreeListener(
0158:                    ReplicatedTreeListener listener) {
0159:                listeners.removeElement(listener);
0160:            }
0161:
0162:            public final void start() throws Exception {
0163:                if (request_handler == null) {
0164:                    request_handler = new Thread(this ,
0165:                            "ReplicatedTree.RequestHandler thread");
0166:                    request_handler.setDaemon(true);
0167:                    request_handler.start();
0168:                }
0169:                adapter = new PullPushAdapter(channel, this , this );
0170:                adapter.setListener(this );
0171:                boolean rc = channel.getState(null, state_fetch_timeout);
0172:
0173:                if (log.isInfoEnabled()) {
0174:                    if (rc)
0175:                        log.info("state was retrieved successfully");
0176:                    else
0177:                        log.info("state could not be retrieved (first member)");
0178:                }
0179:            }
0180:
0181:            public void stop() {
0182:                if (request_handler != null && request_handler.isAlive()) {
0183:                    request_queue.close(true);
0184:                    request_handler = null;
0185:                }
0186:
0187:                request_handler = null;
0188:                if (channel != null) {
0189:                    channel.close();
0190:                }
0191:                if (adapter != null) {
0192:                    adapter.stop();
0193:                    adapter = null;
0194:                }
0195:                channel = null;
0196:            }
0197:
0198:            /**
0199:             * Adds a new node to the tree and sets its data. If the node doesn not yet exist, it will be created.
0200:             * Also, parent nodes will be created if not existent. If the node already has data, then the new data
0201:             * will override the old one. If the node already existed, a nodeModified() notification will be generated.
0202:             * Otherwise a nodeCreated() motification will be emitted.
0203:             * @param fqn The fully qualified name of the new node
0204:             * @param data The new data. May be null if no data should be set in the node.
0205:             */
0206:            public void put(String fqn, HashMap data) {
0207:                if (!remote_calls) {
0208:                    _put(fqn, data);
0209:                    return;
0210:                }
0211:
0212:                //Changes done by <aos>
0213:                //if true, propagate action to the group
0214:                if (send_message == true) {
0215:                    if (channel == null) {
0216:                        if (log.isErrorEnabled())
0217:                            log
0218:                                    .error("channel is null, cannot broadcast PUT request");
0219:                        return;
0220:                    }
0221:                    try {
0222:                        channel.send(new Message(null, null, new Request(
0223:                                Request.PUT, fqn, data)));
0224:                    } catch (Exception ex) {
0225:                        if (log.isErrorEnabled())
0226:                            log.error("failure bcasting PUT request: " + ex);
0227:                    }
0228:                } else {
0229:                    _put(fqn, data);
0230:                }
0231:            }
0232:
0233:            /**
0234:             * Adds a key and value to a given node. If the node doesn't exist, it will be created. If the node
0235:             * already existed, a nodeModified() notification will be generated. Otherwise a
0236:             * nodeCreated() motification will be emitted.
0237:             * @param fqn The fully qualified name of the node
0238:             * @param key The key
0239:             * @param value The value
0240:             */
0241:            public void put(String fqn, String key, Object value) {
0242:                if (!remote_calls) {
0243:                    _put(fqn, key, value);
0244:                    return;
0245:                }
0246:
0247:                //Changes done by <aos>
0248:                //if true, propagate action to the group
0249:                if (send_message == true) {
0250:
0251:                    if (channel == null) {
0252:                        if (log.isErrorEnabled())
0253:                            log
0254:                                    .error("channel is null, cannot broadcast PUT request");
0255:                        return;
0256:                    }
0257:                    try {
0258:                        channel.send(new Message(null, null, new Request(
0259:                                Request.PUT, fqn, key, value)));
0260:                    } catch (Exception ex) {
0261:                        if (log.isErrorEnabled())
0262:                            log.error("failure bcasting PUT request: " + ex);
0263:                    }
0264:                } else {
0265:                    _put(fqn, key, value);
0266:                }
0267:            }
0268:
0269:            /**
0270:             * Removes the node from the tree.
0271:             * @param fqn The fully qualified name of the node.
0272:             */
0273:            public void remove(String fqn) {
0274:                if (!remote_calls) {
0275:                    _remove(fqn);
0276:                    return;
0277:                }
0278:                //Changes done by <aos>
0279:                //if true, propagate action to the group
0280:                if (send_message == true) {
0281:                    if (channel == null) {
0282:                        if (log.isErrorEnabled())
0283:                            log
0284:                                    .error("channel is null, cannot broadcast REMOVE request");
0285:                        return;
0286:                    }
0287:                    try {
0288:                        channel.send(new Message(null, null, new Request(
0289:                                Request.REMOVE, fqn)));
0290:                    } catch (Exception ex) {
0291:                        if (log.isErrorEnabled())
0292:                            log.error("failure bcasting REMOVE request: " + ex);
0293:                    }
0294:                } else {
0295:                    _remove(fqn);
0296:                }
0297:            }
0298:
0299:            /**
0300:             * Removes <code>key</code> from the node's hashmap
0301:             * @param fqn The fullly qualified name of the node
0302:             * @param key The key to be removed
0303:             */
0304:            public void remove(String fqn, String key) {
0305:                if (!remote_calls) {
0306:                    _remove(fqn, key);
0307:                    return;
0308:                }
0309:                //Changes done by <aos>
0310:                //if true, propagate action to the group
0311:                if (send_message == true) {
0312:                    if (channel == null) {
0313:                        if (log.isErrorEnabled())
0314:                            log
0315:                                    .error("channel is null, cannot broadcast REMOVE request");
0316:                        return;
0317:                    }
0318:                    try {
0319:                        channel.send(new Message(null, null, new Request(
0320:                                Request.REMOVE, fqn, key)));
0321:                    } catch (Exception ex) {
0322:                        if (log.isErrorEnabled())
0323:                            log.error("failure bcasting REMOVE request: " + ex);
0324:                    }
0325:                } else {
0326:                    _remove(fqn, key);
0327:                }
0328:            }
0329:
0330:            /**
0331:             * Checks whether a given node exists in the tree
0332:             * @param fqn The fully qualified name of the node
0333:             * @return boolean Whether or not the node exists
0334:             */
0335:            public boolean exists(String fqn) {
0336:                if (fqn == null)
0337:                    return false;
0338:                return findNode(fqn) != null;
0339:            }
0340:
0341:            /**
0342:             * Gets the keys of the <code>data</code> map. Returns all keys as Strings. Returns null if node
0343:             * does not exist.
0344:             * @param fqn The fully qualified name of the node
0345:             * @return Set A set of keys (as Strings)
0346:             */
0347:            public Set getKeys(String fqn) {
0348:                Node n = findNode(fqn);
0349:                Map data;
0350:
0351:                if (n == null)
0352:                    return null;
0353:                data = n.getData();
0354:                if (data == null)
0355:                    return null;
0356:                return data.keySet();
0357:            }
0358:
0359:            /**
0360:             * Finds a node given its name and returns the value associated with a given key in its <code>data</code>
0361:             * map. Returns null if the node was not found in the tree or the key was not found in the hashmap.
0362:             * @param fqn The fully qualified name of the node.
0363:             * @param key The key.
0364:             */
0365:            public Object get(String fqn, String key) {
0366:                Node n = findNode(fqn);
0367:
0368:                if (n == null)
0369:                    return null;
0370:                return n.getData(key);
0371:            }
0372:
0373:            /**
0374:             * Returns the data hashmap for a given node. This method can only be used by callers that are inside
0375:             * the same package. The reason is that callers must not modify the return value, as these modifications
0376:             * would not be replicated, thus rendering the replicas inconsistent.
0377:             * @param fqn The fully qualified name of the node
0378:             * @return HashMap The data hashmap for the given node
0379:             */
0380:            HashMap get(String fqn) {
0381:                Node n = findNode(fqn);
0382:
0383:                if (n == null)
0384:                    return null;
0385:                return n.getData();
0386:            }
0387:
0388:            /**
0389:             * Prints a representation of the node defined by <code>fqn</code>. Output includes name, fqn and
0390:             * data.
0391:             */
0392:            public String print(String fqn) {
0393:                Node n = findNode(fqn);
0394:                if (n == null)
0395:                    return null;
0396:                return n.toString();
0397:            }
0398:
0399:            /**
0400:             * Returns all children of a given node
0401:             * @param fqn The fully qualified name of the node
0402:             * @return Set A list of child names (as Strings)
0403:             */
0404:            public Set getChildrenNames(String fqn) {
0405:                Node n = findNode(fqn);
0406:                Map m;
0407:
0408:                if (n == null)
0409:                    return null;
0410:                m = n.getChildren();
0411:                if (m != null)
0412:                    return m.keySet();
0413:                else
0414:                    return null;
0415:            }
0416:
0417:            public String toString() {
0418:                StringBuffer sb = new StringBuffer();
0419:                int indent = 0;
0420:                Map children;
0421:
0422:                children = root.getChildren();
0423:                if (children != null && children.size() > 0) {
0424:                    Collection nodes = children.values();
0425:                    for (Iterator it = nodes.iterator(); it.hasNext();) {
0426:                        ((Node) it.next()).print(sb, indent);
0427:                        sb.append('\n');
0428:                    }
0429:                } else
0430:                    sb.append(SEPARATOR);
0431:                return sb.toString();
0432:            }
0433:
0434:            /**
0435:             * Returns the name of the group that the DistributedTree is connected to
0436:             * @return String
0437:             */
0438:            public String getGroupName() {
0439:                return groupname;
0440:            }
0441:
0442:            /**
0443:             * Returns the Channel the DistributedTree is connected to 
0444:             * @return Channel
0445:             */
0446:            public Channel getChannel() {
0447:                return channel;
0448:            }
0449:
0450:            /**
0451:             * Returns the number of current members joined to the group
0452:             * @return int
0453:             */
0454:            public int getGroupMembersNumber() {
0455:                return members.size();
0456:            }
0457:
0458:            /* --------------------- Callbacks -------------------------- */
0459:
0460:            public void _put(String fqn, HashMap data) {
0461:                Node n;
0462:                StringHolder child_name = new StringHolder();
0463:                boolean child_exists = false;
0464:
0465:                if (fqn == null)
0466:                    return;
0467:                n = findParentNode(fqn, child_name, true); // create all nodes if they don't exist
0468:                if (child_name.getValue() != null) {
0469:                    child_exists = n.childExists(child_name.getValue());
0470:                    n.createChild(child_name.getValue(), fqn, n, data);
0471:                } else {
0472:                    child_exists = true;
0473:                    n.setData(data);
0474:                }
0475:                if (child_exists)
0476:                    notifyNodeModified(fqn);
0477:                else
0478:                    notifyNodeAdded(fqn);
0479:            }
0480:
0481:            public void _put(String fqn, String key, Object value) {
0482:                Node n;
0483:                StringHolder child_name = new StringHolder();
0484:                boolean child_exists = false;
0485:
0486:                if (fqn == null || key == null || value == null)
0487:                    return;
0488:                n = findParentNode(fqn, child_name, true);
0489:                if (child_name.getValue() != null) {
0490:                    child_exists = n.childExists(child_name.getValue());
0491:                    n.createChild(child_name.getValue(), fqn, n, key, value);
0492:                } else {
0493:                    child_exists = true;
0494:                    n.setData(key, value);
0495:                }
0496:                if (child_exists)
0497:                    notifyNodeModified(fqn);
0498:                else
0499:                    notifyNodeAdded(fqn);
0500:            }
0501:
0502:            public void _remove(String fqn) {
0503:                Node n;
0504:                StringHolder child_name = new StringHolder();
0505:
0506:                if (fqn == null)
0507:                    return;
0508:                if (fqn.equals(SEPARATOR)) {
0509:                    root.removeAll();
0510:                    notifyNodeRemoved(fqn);
0511:                    return;
0512:                }
0513:                n = findParentNode(fqn, child_name, false);
0514:                if (n == null)
0515:                    return;
0516:                n.removeChild(child_name.getValue(), fqn);
0517:                notifyNodeRemoved(fqn);
0518:            }
0519:
0520:            public void _remove(String fqn, String key) {
0521:                Node n;
0522:
0523:                if (fqn == null || key == null)
0524:                    return;
0525:                n = findNode(fqn);
0526:                if (n != null)
0527:                    n.removeData(key);
0528:            }
0529:
0530:            public void _removeData(String fqn) {
0531:                Node n;
0532:
0533:                if (fqn == null)
0534:                    return;
0535:                n = findNode(fqn);
0536:                if (n != null)
0537:                    n.removeData();
0538:            }
0539:
0540:            /* ----------------- End of  Callbacks ---------------------- */
0541:
0542:            /*-------------------- MessageListener ----------------------*/
0543:
0544:            /** Callback. Process the contents of the message; typically an _add() or _set() request */
0545:            public void receive(Message msg) {
0546:                Request req = null;
0547:
0548:                if (msg == null || msg.getLength() == 0)
0549:                    return;
0550:                try {
0551:                    req = (Request) msg.getObject();
0552:                    request_queue.add(req);
0553:                } catch (QueueClosedException queue_closed_ex) {
0554:                    if (log.isErrorEnabled())
0555:                        log.error("request queue is null");
0556:                } catch (Exception ex) {
0557:                    if (log.isErrorEnabled())
0558:                        log.error("failed unmarshalling request: " + ex);
0559:                }
0560:            }
0561:
0562:            /** Return a copy of the current cache (tree) */
0563:            public byte[] getState() {
0564:                try {
0565:                    return Util.objectToByteBuffer(root.clone());
0566:                } catch (Throwable ex) {
0567:                    if (log.isErrorEnabled())
0568:                        log.error("exception returning cache: " + ex);
0569:                    return null;
0570:                }
0571:            }
0572:
0573:            /** Set the cache (tree) to this value */
0574:            public void setState(byte[] new_state) {
0575:                Node new_root = null;
0576:                Object obj;
0577:
0578:                if (new_state == null) {
0579:                    if (log.isInfoEnabled())
0580:                        log.info("new cache is null");
0581:                    return;
0582:                }
0583:                try {
0584:                    obj = Util.objectFromByteBuffer(new_state);
0585:                    new_root = (Node) ((Node) obj).clone();
0586:                    root = new_root;
0587:                    notifyAllNodesCreated(root);
0588:                } catch (Throwable ex) {
0589:                    if (log.isErrorEnabled())
0590:                        log.error("could not set cache: " + ex);
0591:                }
0592:            }
0593:
0594:            /*-------------------- End of MessageListener ----------------------*/
0595:
0596:            /*----------------------- MembershipListener ------------------------*/
0597:
0598:            public void viewAccepted(View new_view) {
0599:                Vector new_mbrs = new_view.getMembers();
0600:
0601:                // todo: if MergeView, fetch and reconcile state from coordinator
0602:                // actually maybe this is best left up to the application ? we just notify them and let
0603:                // the appl handle it ?
0604:
0605:                if (new_mbrs != null) {
0606:                    notifyViewChange(new_view);
0607:                    members.removeAllElements();
0608:                    for (int i = 0; i < new_mbrs.size(); i++)
0609:                        members.addElement(new_mbrs.elementAt(i));
0610:                }
0611:                //if size is bigger than one, there are more peers in the group
0612:                //otherwise there is only one server.
0613:                send_message = members.size() > 1;
0614:            }
0615:
0616:            /** Called when a member is suspected */
0617:            public void suspect(Address suspected_mbr) {
0618:                ;
0619:            }
0620:
0621:            /** Block sending and receiving of messages until viewAccepted() is called */
0622:            public void block() {
0623:            }
0624:
0625:            /*------------------- End of MembershipListener ----------------------*/
0626:
0627:            /** Request handler thread */
0628:            public void run() {
0629:                Request req;
0630:                String fqn = null;
0631:
0632:                while (request_handler != null) {
0633:                    try {
0634:                        req = (Request) request_queue.remove(0);
0635:                        fqn = req.fqn;
0636:                        switch (req.type) {
0637:                        case Request.PUT:
0638:                            if (req.key != null && req.value != null)
0639:                                _put(fqn, req.key, req.value);
0640:                            else
0641:                                _put(fqn, req.data);
0642:                            break;
0643:                        case Request.REMOVE:
0644:                            if (req.key != null)
0645:                                _remove(fqn, req.key);
0646:                            else
0647:                                _remove(fqn);
0648:                            break;
0649:                        default:
0650:                            if (log.isErrorEnabled())
0651:                                log.error("type " + req.type + " unknown");
0652:                            break;
0653:                        }
0654:                    } catch (QueueClosedException queue_closed_ex) {
0655:                        request_handler = null;
0656:                        break;
0657:                    } catch (Throwable other_ex) {
0658:                        if (log.isWarnEnabled())
0659:                            log.warn("exception processing request: "
0660:                                    + other_ex);
0661:                    }
0662:                }
0663:            }
0664:
0665:            /**
0666:             * Find the node just <em>above</em> the one indicated by <code>fqn</code>. This is needed in many cases,
0667:             * e.g. to add a new node or remove an existing node.
0668:             * @param fqn The fully qualified name of the node.
0669:             * @param child_name Will be filled with the name of the child when this method returns. The child name
0670:             *                   is the last relative name of the <code>fqn</code>, e.g. in "/a/b/c" it would be "c".
0671:             * @param create_if_not_exists Create parent nodes along the way if they don't exist. Otherwise, this method
0672:             *                             will return when a node cannot be found.
0673:             */
0674:            Node findParentNode(String fqn, StringHolder child_name,
0675:                    boolean create_if_not_exists) {
0676:                Node curr = root, node;
0677:                StringTokenizer tok;
0678:                String name;
0679:                StringBuffer sb = null;
0680:
0681:                if (fqn == null || fqn.equals(SEPARATOR) || "".equals(fqn))
0682:                    return curr;
0683:
0684:                sb = new StringBuffer();
0685:                tok = new StringTokenizer(fqn, SEPARATOR);
0686:                while (tok.countTokens() > 1) {
0687:                    name = tok.nextToken();
0688:                    sb.append(SEPARATOR).append(name);
0689:                    node = curr.getChild(name);
0690:                    if (node == null && create_if_not_exists)
0691:                        node = curr
0692:                                .createChild(name, sb.toString(), null, null);
0693:                    if (node == null)
0694:                        return null;
0695:                    else
0696:                        curr = node;
0697:                }
0698:
0699:                if (tok.countTokens() > 0 && child_name != null)
0700:                    child_name.setValue(tok.nextToken());
0701:                return curr;
0702:            }
0703:
0704:            /**
0705:             * Returns the node at fqn. This method should not be used by clients (therefore it is package-private):
0706:             * it is only used internally (for navigation). C++ 'friend' would come in handy here...
0707:             * @param fqn The fully qualified name of the node
0708:             * @return Node The node at fqn
0709:             */
0710:            Node findNode(String fqn) {
0711:                StringHolder sh = new StringHolder();
0712:                Node n = findParentNode(fqn, sh, false);
0713:                String child_name = sh.getValue();
0714:
0715:                if (fqn == null || fqn.equals(SEPARATOR) || "".equals(fqn))
0716:                    return root;
0717:
0718:                if (n == null || child_name == null)
0719:                    return null;
0720:                else
0721:                    return n.getChild(child_name);
0722:            }
0723:
0724:            void notifyNodeAdded(String fqn) {
0725:                for (int i = 0; i < listeners.size(); i++)
0726:                    ((ReplicatedTreeListener) listeners.elementAt(i))
0727:                            .nodeAdded(fqn);
0728:            }
0729:
0730:            void notifyNodeRemoved(String fqn) {
0731:                for (int i = 0; i < listeners.size(); i++)
0732:                    ((ReplicatedTreeListener) listeners.elementAt(i))
0733:                            .nodeRemoved(fqn);
0734:            }
0735:
0736:            void notifyNodeModified(String fqn) {
0737:                for (int i = 0; i < listeners.size(); i++)
0738:                    ((ReplicatedTreeListener) listeners.elementAt(i))
0739:                            .nodeModified(fqn);
0740:            }
0741:
0742:            void notifyViewChange(View v) {
0743:                for (int i = 0; i < listeners.size(); i++)
0744:                    ((ReplicatedTreeListener) listeners.elementAt(i))
0745:                            .viewChange(v);
0746:            }
0747:
0748:            /** Generates NodeAdded notifications for all nodes of the tree. This is called whenever the tree is
0749:             initially retrieved (state transfer) */
0750:            void notifyAllNodesCreated(Node curr) {
0751:                Node n;
0752:                Map children;
0753:
0754:                if (curr == null)
0755:                    return;
0756:                notifyNodeAdded(curr.fqn);
0757:                if ((children = curr.getChildren()) != null) {
0758:                    for (Iterator it = children.values().iterator(); it
0759:                            .hasNext();) {
0760:                        n = (Node) it.next();
0761:                        notifyAllNodesCreated(n);
0762:                    }
0763:                }
0764:            }
0765:
0766:            public static class Node implements  Serializable {
0767:                String name = null; // relative name (e.g. "Security")
0768:                String fqn = null; // fully qualified name (e.g. "/federations/fed1/servers/Security")
0769:                Node parent = null; // parent node
0770:                TreeMap children = null; // keys: child name, value: Node
0771:                HashMap data = null; // data for current node
0772:                private static final long serialVersionUID = -3077676554440038890L;
0773:
0774:                // Address       creator=null;  // member that created this node (needed ?)
0775:
0776:                private Node(String child_name, String fqn, Node parent,
0777:                        HashMap data) {
0778:                    name = child_name;
0779:                    this .fqn = fqn;
0780:                    this .parent = parent;
0781:                    if (data != null)
0782:                        this .data = (HashMap) data.clone();
0783:                }
0784:
0785:                private Node(String child_name, String fqn, Node parent,
0786:                        String key, Object value) {
0787:                    name = child_name;
0788:                    this .fqn = fqn;
0789:                    this .parent = parent;
0790:                    if (data == null)
0791:                        data = new HashMap();
0792:                    data.put(key, value);
0793:                }
0794:
0795:                void setData(Map data) {
0796:                    if (data == null)
0797:                        return;
0798:                    if (this .data == null)
0799:                        this .data = new HashMap();
0800:                    this .data.putAll(data);
0801:                }
0802:
0803:                void setData(String key, Object value) {
0804:                    if (this .data == null)
0805:                        this .data = new HashMap();
0806:                    this .data.put(key, value);
0807:                }
0808:
0809:                HashMap getData() {
0810:                    return data;
0811:                }
0812:
0813:                Object getData(String key) {
0814:                    return data != null ? data.get(key) : null;
0815:                }
0816:
0817:                boolean childExists(String child_name) {
0818:                    if (child_name == null)
0819:                        return false;
0820:                    return children != null && children.containsKey(child_name);
0821:                }
0822:
0823:                Node createChild(String child_name, String fqn, Node parent,
0824:                        HashMap data) {
0825:                    Node child = null;
0826:
0827:                    if (child_name == null)
0828:                        return null;
0829:                    if (children == null)
0830:                        children = new TreeMap();
0831:                    child = (Node) children.get(child_name);
0832:                    if (child != null)
0833:                        child.setData(data);
0834:                    else {
0835:                        child = new Node(child_name, fqn, parent, data);
0836:                        children.put(child_name, child);
0837:                    }
0838:                    return child;
0839:                }
0840:
0841:                Node createChild(String child_name, String fqn, Node parent,
0842:                        String key, Object value) {
0843:                    Node child = null;
0844:
0845:                    if (child_name == null)
0846:                        return null;
0847:                    if (children == null)
0848:                        children = new TreeMap();
0849:                    child = (Node) children.get(child_name);
0850:                    if (child != null)
0851:                        child.setData(key, value);
0852:                    else {
0853:                        child = new Node(child_name, fqn, parent, key, value);
0854:                        children.put(child_name, child);
0855:                    }
0856:                    return child;
0857:                }
0858:
0859:                Node getChild(String child_name) {
0860:                    return child_name == null ? null : children == null ? null
0861:                            : (Node) children.get(child_name);
0862:                }
0863:
0864:                Map getChildren() {
0865:                    return children;
0866:                }
0867:
0868:                void removeData(String key) {
0869:                    if (data != null)
0870:                        data.remove(key);
0871:                }
0872:
0873:                void removeData() {
0874:                    if (data != null)
0875:                        data.clear();
0876:                }
0877:
0878:                void removeChild(String child_name, String fqn) {
0879:                    if (child_name != null && children != null
0880:                            && children.containsKey(child_name)) {
0881:                        children.remove(child_name);
0882:                    }
0883:                }
0884:
0885:                void removeAll() {
0886:                    if (children != null)
0887:                        children.clear();
0888:                }
0889:
0890:                void print(StringBuffer sb, int indent) {
0891:                    printIndent(sb, indent);
0892:                    sb.append(SEPARATOR).append(name);
0893:                    if (children != null && children.size() > 0) {
0894:                        Collection values = children.values();
0895:                        for (Iterator it = values.iterator(); it.hasNext();) {
0896:                            sb.append('\n');
0897:                            ((Node) it.next()).print(sb, indent + INDENT);
0898:                        }
0899:                    }
0900:                }
0901:
0902:                void printIndent(StringBuffer sb, int indent) {
0903:                    if (sb != null) {
0904:                        for (int i = 0; i < indent; i++)
0905:                            sb.append(' ');
0906:                    }
0907:                }
0908:
0909:                public String toString() {
0910:                    StringBuffer sb = new StringBuffer();
0911:                    if (name != null)
0912:                        sb.append("\nname=" + name);
0913:                    if (fqn != null)
0914:                        sb.append("\nfqn=" + fqn);
0915:                    if (data != null)
0916:                        sb.append("\ndata=" + data);
0917:                    return sb.toString();
0918:                }
0919:
0920:                public Object clone() throws CloneNotSupportedException {
0921:                    Node n = new Node(name, fqn, parent != null ? (Node) parent
0922:                            .clone() : null, data);
0923:                    if (children != null)
0924:                        n.children = (TreeMap) children.clone();
0925:                    return n;
0926:                }
0927:
0928:            }
0929:
0930:            private static class StringHolder {
0931:                String s = null;
0932:
0933:                private StringHolder() {
0934:                }
0935:
0936:                void setValue(String s) {
0937:                    this .s = s;
0938:                }
0939:
0940:                String getValue() {
0941:                    return s;
0942:                }
0943:            }
0944:
0945:            /**
0946:             * Class used to multicast add(), remove() and set() methods to all members.
0947:             */
0948:            private static class Request implements  Serializable {
0949:                static final int PUT = 1;
0950:                static final int REMOVE = 2;
0951:
0952:                int type = 0;
0953:                String fqn = null;
0954:                String key = null;
0955:                Object value = null;
0956:                HashMap data = null;
0957:                private static final long serialVersionUID = 7772753222127676782L;
0958:
0959:                private Request(int type, String fqn) {
0960:                    this .type = type;
0961:                    this .fqn = fqn;
0962:                }
0963:
0964:                private Request(int type, String fqn, HashMap data) {
0965:                    this (type, fqn);
0966:                    this .data = data;
0967:                }
0968:
0969:                private Request(int type, String fqn, String key) {
0970:                    this (type, fqn);
0971:                    this .key = key;
0972:                }
0973:
0974:                private Request(int type, String fqn, String key, Object value) {
0975:                    this (type, fqn);
0976:                    this .key = key;
0977:                    this .value = value;
0978:                }
0979:
0980:                public String toString() {
0981:                    StringBuffer sb = new StringBuffer();
0982:                    sb.append(type2String(type)).append(" (");
0983:                    if (fqn != null)
0984:                        sb.append(" fqn=" + fqn);
0985:                    switch (type) {
0986:                    case PUT:
0987:                        if (data != null)
0988:                            sb.append(", data=" + data);
0989:                        if (key != null)
0990:                            sb.append(", key=" + key);
0991:                        if (value != null)
0992:                            sb.append(", value=" + value);
0993:                        break;
0994:                    case REMOVE:
0995:                        if (key != null)
0996:                            sb.append(", key=" + key);
0997:                        break;
0998:                    default:
0999:                        break;
1000:                    }
1001:                    sb.append(')');
1002:                    return sb.toString();
1003:                }
1004:
1005:                static String type2String(int t) {
1006:                    switch (t) {
1007:                    case PUT:
1008:                        return "PUT";
1009:                    case REMOVE:
1010:                        return "REMOVE";
1011:                    default:
1012:                        return "UNKNOWN";
1013:                    }
1014:                }
1015:
1016:            }
1017:
1018:            public static void main(String[] args) {
1019:
1020:                ReplicatedTree tree = null;
1021:                HashMap m = new HashMap();
1022:                String props;
1023:
1024:                props = "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;"
1025:                        + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
1026:                        + "PING(timeout=2000;num_initial_members=3):"
1027:                        + "MERGE2(min_interval=5000;max_interval=10000):"
1028:                        + "FD_SOCK:"
1029:                        + "VERIFY_SUSPECT(timeout=1500):"
1030:                        + "pbcast.STABLE(desired_avg_gossip=20000):"
1031:                        + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):"
1032:                        + "UNICAST(timeout=5000):"
1033:                        + "FRAG(frag_size=16000;down_thread=false;up_thread=false):"
1034:                        + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
1035:                        + "shun=false;print_local_addr=true):"
1036:                        + "pbcast.STATE_TRANSFER";
1037:                // "PERF(details=true)";
1038:
1039:                try {
1040:
1041:                    tree = new ReplicatedTree(null, props, 10000);
1042:                    // tree.setRemoteCalls(false);
1043:                    tree.addReplicatedTreeListener(new MyListener());
1044:                    tree.put("/a/b/c", null);
1045:                    tree.put("/a/b/c1", null);
1046:                    tree.put("/a/b/c2", null);
1047:                    tree.put("/a/b1/chat", null);
1048:                    tree.put("/a/b1/chat2", null);
1049:                    tree.put("/a/b1/chat5", null);
1050:                    System.out.println(tree);
1051:                    m.put("name", "Bela Ban");
1052:                    m.put("age", new Integer(36));
1053:                    m.put("cube", "240-17");
1054:                    tree.put("/a/b/c", m);
1055:                    System.out.println("info for for \"/a/b/c\" is "
1056:                            + tree.print("/a/b/c"));
1057:                    tree.put("/a/b/c", "age", new Integer(37));
1058:                    System.out.println("info for for \"/a/b/c\" is "
1059:                            + tree.print("/a/b/c"));
1060:                    tree.remove("/a/b");
1061:                    System.out.println(tree);
1062:                } catch (Exception ex) {
1063:                    System.err.println(ex);
1064:                }
1065:            }
1066:
1067:            static class MyListener implements  ReplicatedTreeListener {
1068:
1069:                public void nodeAdded(String fqn) {
1070:                    System.out.println("** node added: " + fqn);
1071:                }
1072:
1073:                public void nodeRemoved(String fqn) {
1074:                    System.out.println("** node removed: " + fqn);
1075:                }
1076:
1077:                public void nodeModified(String fqn) {
1078:                    System.out.println("** node modified: " + fqn);
1079:                }
1080:
1081:                public void viewChange(View new_view) {
1082:                    System.out.println("** view change: " + new_view);
1083:                }
1084:
1085:            }
1086:
1087:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.