0001: // $Id: ReplicatedTree.java,v 1.15 2006/07/31 09:21:58 belaban Exp $
0002:
0003: package org.jgroups.blocks;
0004:
0005: import org.apache.commons.logging.Log;
0006: import org.apache.commons.logging.LogFactory;
0007: import org.jgroups.*;
0008: import org.jgroups.jmx.JmxConfigurator;
0009: import org.jgroups.util.Queue;
0010: import org.jgroups.util.QueueClosedException;
0011: import org.jgroups.util.Util;
0012:
0013: import javax.management.MBeanServer;
0014: import java.io.Serializable;
0015: import java.util.*;
0016:
0017: /**
0018: * A tree-like structure that is replicated across several members. Updates will be multicast to all group
0019: * members reliably and in the same order.
0020: * @author Bela Ban Jan 17 2002
0021: * @author <a href="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a>
0022: */
0023: public class ReplicatedTree implements Runnable, MessageListener,
0024: MembershipListener {
0025: public static final String SEPARATOR = "/";
0026: final static int INDENT = 4;
0027: Node root = new Node(SEPARATOR, SEPARATOR, null, null);
0028: final Vector listeners = new Vector();
0029: final Queue request_queue = new Queue();
0030: Thread request_handler = null;
0031: JChannel channel = null;
0032: PullPushAdapter adapter = null;
0033: String groupname = "ReplicatedTree-Group";
0034: final Vector members = new Vector();
0035: long state_fetch_timeout = 10000;
0036: boolean jmx = false;
0037:
0038: protected final Log log = LogFactory.getLog(this .getClass());
0039:
0040: /** Whether or not to use remote calls. If false, all methods will be invoked directly on this
0041: instance rather than sending a message to all replicas and only then invoking the method.
0042: Useful for testing */
0043: boolean remote_calls = true;
0044: String props = "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;"
0045: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
0046: + "PING(timeout=2000;num_initial_members=3):"
0047: + "MERGE2(min_interval=5000;max_interval=10000):"
0048: + "FD_SOCK:"
0049: + "VERIFY_SUSPECT(timeout=1500):"
0050: + "pbcast.STABLE(desired_avg_gossip=20000):"
0051: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):"
0052: + "UNICAST(timeout=5000):"
0053: + "FRAG(frag_size=16000;down_thread=false;up_thread=false):"
0054: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
0055: + "shun=false;print_local_addr=true):"
0056: + "pbcast.STATE_TRANSFER";
0057: // "PERF(details=true)";
0058:
0059: /** Determines when the updates have to be sent across the network, avoids sending unnecessary
0060: * messages when there are no member in the group */
0061: private boolean send_message = false;
0062:
0063: public interface ReplicatedTreeListener {
0064: void nodeAdded(String fqn);
0065:
0066: void nodeRemoved(String fqn);
0067:
0068: void nodeModified(String fqn);
0069:
0070: void viewChange(View new_view); // might be MergeView after merging
0071: }
0072:
0073: /**
0074: * Creates a channel with the given properties. Connects to the channel, then creates a PullPushAdapter
0075: * and starts it
0076: */
0077: public ReplicatedTree(String groupname, String props,
0078: long state_fetch_timeout) throws Exception {
0079: if (groupname != null)
0080: this .groupname = groupname;
0081: if (props != null)
0082: this .props = props;
0083: this .state_fetch_timeout = state_fetch_timeout;
0084: channel = new JChannel(this .props);
0085: channel.connect(this .groupname);
0086: start();
0087: }
0088:
0089: public ReplicatedTree(String groupname, String props,
0090: long state_fetch_timeout, boolean jmx) throws Exception {
0091: if (groupname != null)
0092: this .groupname = groupname;
0093: if (props != null)
0094: this .props = props;
0095: this .jmx = jmx;
0096: this .state_fetch_timeout = state_fetch_timeout;
0097: channel = new JChannel(this .props);
0098: channel.connect(this .groupname);
0099: if (jmx) {
0100: MBeanServer server = Util.getMBeanServer();
0101: if (server == null)
0102: throw new Exception(
0103: "No MBeanServers found; need to run with an MBeanServer present, or inside JDK 5");
0104: JmxConfigurator.registerChannel(channel, server, "jgroups",
0105: channel.getClusterName(), true);
0106: }
0107: start();
0108: }
0109:
0110: public ReplicatedTree() {
0111: }
0112:
0113: /**
0114: * Expects an already connected channel. Creates a PullPushAdapter and starts it
0115: */
0116: public ReplicatedTree(JChannel channel) throws Exception {
0117: this .channel = channel;
0118: start();
0119: }
0120:
0121: public void setRemoteCalls(boolean flag) {
0122: remote_calls = flag;
0123: }
0124:
0125: public void setRootNode(Node n) {
0126: root = n;
0127: }
0128:
0129: public Address getLocalAddress() {
0130: return channel != null ? channel.getLocalAddress() : null;
0131: }
0132:
0133: public Vector getMembers() {
0134: return members;
0135: }
0136:
0137: /**
0138: * Fetch the group state from the current coordinator. If successful, this will trigger setState().
0139: */
0140: public void fetchState(long timeout) throws ChannelClosedException,
0141: ChannelNotConnectedException {
0142: boolean rc = channel.getState(null, timeout);
0143: if (log.isInfoEnabled()) {
0144: if (rc)
0145: log.info("state was retrieved successfully");
0146: else
0147: log.info("state could not be retrieved (first member)");
0148: }
0149: }
0150:
0151: public void addReplicatedTreeListener(
0152: ReplicatedTreeListener listener) {
0153: if (!listeners.contains(listener))
0154: listeners.addElement(listener);
0155: }
0156:
0157: public void removeReplicatedTreeListener(
0158: ReplicatedTreeListener listener) {
0159: listeners.removeElement(listener);
0160: }
0161:
0162: public final void start() throws Exception {
0163: if (request_handler == null) {
0164: request_handler = new Thread(this ,
0165: "ReplicatedTree.RequestHandler thread");
0166: request_handler.setDaemon(true);
0167: request_handler.start();
0168: }
0169: adapter = new PullPushAdapter(channel, this , this );
0170: adapter.setListener(this );
0171: boolean rc = channel.getState(null, state_fetch_timeout);
0172:
0173: if (log.isInfoEnabled()) {
0174: if (rc)
0175: log.info("state was retrieved successfully");
0176: else
0177: log.info("state could not be retrieved (first member)");
0178: }
0179: }
0180:
0181: public void stop() {
0182: if (request_handler != null && request_handler.isAlive()) {
0183: request_queue.close(true);
0184: request_handler = null;
0185: }
0186:
0187: request_handler = null;
0188: if (channel != null) {
0189: channel.close();
0190: }
0191: if (adapter != null) {
0192: adapter.stop();
0193: adapter = null;
0194: }
0195: channel = null;
0196: }
0197:
0198: /**
0199: * Adds a new node to the tree and sets its data. If the node doesn not yet exist, it will be created.
0200: * Also, parent nodes will be created if not existent. If the node already has data, then the new data
0201: * will override the old one. If the node already existed, a nodeModified() notification will be generated.
0202: * Otherwise a nodeCreated() motification will be emitted.
0203: * @param fqn The fully qualified name of the new node
0204: * @param data The new data. May be null if no data should be set in the node.
0205: */
0206: public void put(String fqn, HashMap data) {
0207: if (!remote_calls) {
0208: _put(fqn, data);
0209: return;
0210: }
0211:
0212: //Changes done by <aos>
0213: //if true, propagate action to the group
0214: if (send_message == true) {
0215: if (channel == null) {
0216: if (log.isErrorEnabled())
0217: log
0218: .error("channel is null, cannot broadcast PUT request");
0219: return;
0220: }
0221: try {
0222: channel.send(new Message(null, null, new Request(
0223: Request.PUT, fqn, data)));
0224: } catch (Exception ex) {
0225: if (log.isErrorEnabled())
0226: log.error("failure bcasting PUT request: " + ex);
0227: }
0228: } else {
0229: _put(fqn, data);
0230: }
0231: }
0232:
0233: /**
0234: * Adds a key and value to a given node. If the node doesn't exist, it will be created. If the node
0235: * already existed, a nodeModified() notification will be generated. Otherwise a
0236: * nodeCreated() motification will be emitted.
0237: * @param fqn The fully qualified name of the node
0238: * @param key The key
0239: * @param value The value
0240: */
0241: public void put(String fqn, String key, Object value) {
0242: if (!remote_calls) {
0243: _put(fqn, key, value);
0244: return;
0245: }
0246:
0247: //Changes done by <aos>
0248: //if true, propagate action to the group
0249: if (send_message == true) {
0250:
0251: if (channel == null) {
0252: if (log.isErrorEnabled())
0253: log
0254: .error("channel is null, cannot broadcast PUT request");
0255: return;
0256: }
0257: try {
0258: channel.send(new Message(null, null, new Request(
0259: Request.PUT, fqn, key, value)));
0260: } catch (Exception ex) {
0261: if (log.isErrorEnabled())
0262: log.error("failure bcasting PUT request: " + ex);
0263: }
0264: } else {
0265: _put(fqn, key, value);
0266: }
0267: }
0268:
0269: /**
0270: * Removes the node from the tree.
0271: * @param fqn The fully qualified name of the node.
0272: */
0273: public void remove(String fqn) {
0274: if (!remote_calls) {
0275: _remove(fqn);
0276: return;
0277: }
0278: //Changes done by <aos>
0279: //if true, propagate action to the group
0280: if (send_message == true) {
0281: if (channel == null) {
0282: if (log.isErrorEnabled())
0283: log
0284: .error("channel is null, cannot broadcast REMOVE request");
0285: return;
0286: }
0287: try {
0288: channel.send(new Message(null, null, new Request(
0289: Request.REMOVE, fqn)));
0290: } catch (Exception ex) {
0291: if (log.isErrorEnabled())
0292: log.error("failure bcasting REMOVE request: " + ex);
0293: }
0294: } else {
0295: _remove(fqn);
0296: }
0297: }
0298:
0299: /**
0300: * Removes <code>key</code> from the node's hashmap
0301: * @param fqn The fullly qualified name of the node
0302: * @param key The key to be removed
0303: */
0304: public void remove(String fqn, String key) {
0305: if (!remote_calls) {
0306: _remove(fqn, key);
0307: return;
0308: }
0309: //Changes done by <aos>
0310: //if true, propagate action to the group
0311: if (send_message == true) {
0312: if (channel == null) {
0313: if (log.isErrorEnabled())
0314: log
0315: .error("channel is null, cannot broadcast REMOVE request");
0316: return;
0317: }
0318: try {
0319: channel.send(new Message(null, null, new Request(
0320: Request.REMOVE, fqn, key)));
0321: } catch (Exception ex) {
0322: if (log.isErrorEnabled())
0323: log.error("failure bcasting REMOVE request: " + ex);
0324: }
0325: } else {
0326: _remove(fqn, key);
0327: }
0328: }
0329:
0330: /**
0331: * Checks whether a given node exists in the tree
0332: * @param fqn The fully qualified name of the node
0333: * @return boolean Whether or not the node exists
0334: */
0335: public boolean exists(String fqn) {
0336: if (fqn == null)
0337: return false;
0338: return findNode(fqn) != null;
0339: }
0340:
0341: /**
0342: * Gets the keys of the <code>data</code> map. Returns all keys as Strings. Returns null if node
0343: * does not exist.
0344: * @param fqn The fully qualified name of the node
0345: * @return Set A set of keys (as Strings)
0346: */
0347: public Set getKeys(String fqn) {
0348: Node n = findNode(fqn);
0349: Map data;
0350:
0351: if (n == null)
0352: return null;
0353: data = n.getData();
0354: if (data == null)
0355: return null;
0356: return data.keySet();
0357: }
0358:
0359: /**
0360: * Finds a node given its name and returns the value associated with a given key in its <code>data</code>
0361: * map. Returns null if the node was not found in the tree or the key was not found in the hashmap.
0362: * @param fqn The fully qualified name of the node.
0363: * @param key The key.
0364: */
0365: public Object get(String fqn, String key) {
0366: Node n = findNode(fqn);
0367:
0368: if (n == null)
0369: return null;
0370: return n.getData(key);
0371: }
0372:
0373: /**
0374: * Returns the data hashmap for a given node. This method can only be used by callers that are inside
0375: * the same package. The reason is that callers must not modify the return value, as these modifications
0376: * would not be replicated, thus rendering the replicas inconsistent.
0377: * @param fqn The fully qualified name of the node
0378: * @return HashMap The data hashmap for the given node
0379: */
0380: HashMap get(String fqn) {
0381: Node n = findNode(fqn);
0382:
0383: if (n == null)
0384: return null;
0385: return n.getData();
0386: }
0387:
0388: /**
0389: * Prints a representation of the node defined by <code>fqn</code>. Output includes name, fqn and
0390: * data.
0391: */
0392: public String print(String fqn) {
0393: Node n = findNode(fqn);
0394: if (n == null)
0395: return null;
0396: return n.toString();
0397: }
0398:
0399: /**
0400: * Returns all children of a given node
0401: * @param fqn The fully qualified name of the node
0402: * @return Set A list of child names (as Strings)
0403: */
0404: public Set getChildrenNames(String fqn) {
0405: Node n = findNode(fqn);
0406: Map m;
0407:
0408: if (n == null)
0409: return null;
0410: m = n.getChildren();
0411: if (m != null)
0412: return m.keySet();
0413: else
0414: return null;
0415: }
0416:
0417: public String toString() {
0418: StringBuffer sb = new StringBuffer();
0419: int indent = 0;
0420: Map children;
0421:
0422: children = root.getChildren();
0423: if (children != null && children.size() > 0) {
0424: Collection nodes = children.values();
0425: for (Iterator it = nodes.iterator(); it.hasNext();) {
0426: ((Node) it.next()).print(sb, indent);
0427: sb.append('\n');
0428: }
0429: } else
0430: sb.append(SEPARATOR);
0431: return sb.toString();
0432: }
0433:
0434: /**
0435: * Returns the name of the group that the DistributedTree is connected to
0436: * @return String
0437: */
0438: public String getGroupName() {
0439: return groupname;
0440: }
0441:
0442: /**
0443: * Returns the Channel the DistributedTree is connected to
0444: * @return Channel
0445: */
0446: public Channel getChannel() {
0447: return channel;
0448: }
0449:
0450: /**
0451: * Returns the number of current members joined to the group
0452: * @return int
0453: */
0454: public int getGroupMembersNumber() {
0455: return members.size();
0456: }
0457:
0458: /* --------------------- Callbacks -------------------------- */
0459:
0460: public void _put(String fqn, HashMap data) {
0461: Node n;
0462: StringHolder child_name = new StringHolder();
0463: boolean child_exists = false;
0464:
0465: if (fqn == null)
0466: return;
0467: n = findParentNode(fqn, child_name, true); // create all nodes if they don't exist
0468: if (child_name.getValue() != null) {
0469: child_exists = n.childExists(child_name.getValue());
0470: n.createChild(child_name.getValue(), fqn, n, data);
0471: } else {
0472: child_exists = true;
0473: n.setData(data);
0474: }
0475: if (child_exists)
0476: notifyNodeModified(fqn);
0477: else
0478: notifyNodeAdded(fqn);
0479: }
0480:
0481: public void _put(String fqn, String key, Object value) {
0482: Node n;
0483: StringHolder child_name = new StringHolder();
0484: boolean child_exists = false;
0485:
0486: if (fqn == null || key == null || value == null)
0487: return;
0488: n = findParentNode(fqn, child_name, true);
0489: if (child_name.getValue() != null) {
0490: child_exists = n.childExists(child_name.getValue());
0491: n.createChild(child_name.getValue(), fqn, n, key, value);
0492: } else {
0493: child_exists = true;
0494: n.setData(key, value);
0495: }
0496: if (child_exists)
0497: notifyNodeModified(fqn);
0498: else
0499: notifyNodeAdded(fqn);
0500: }
0501:
0502: public void _remove(String fqn) {
0503: Node n;
0504: StringHolder child_name = new StringHolder();
0505:
0506: if (fqn == null)
0507: return;
0508: if (fqn.equals(SEPARATOR)) {
0509: root.removeAll();
0510: notifyNodeRemoved(fqn);
0511: return;
0512: }
0513: n = findParentNode(fqn, child_name, false);
0514: if (n == null)
0515: return;
0516: n.removeChild(child_name.getValue(), fqn);
0517: notifyNodeRemoved(fqn);
0518: }
0519:
0520: public void _remove(String fqn, String key) {
0521: Node n;
0522:
0523: if (fqn == null || key == null)
0524: return;
0525: n = findNode(fqn);
0526: if (n != null)
0527: n.removeData(key);
0528: }
0529:
0530: public void _removeData(String fqn) {
0531: Node n;
0532:
0533: if (fqn == null)
0534: return;
0535: n = findNode(fqn);
0536: if (n != null)
0537: n.removeData();
0538: }
0539:
0540: /* ----------------- End of Callbacks ---------------------- */
0541:
0542: /*-------------------- MessageListener ----------------------*/
0543:
0544: /** Callback. Process the contents of the message; typically an _add() or _set() request */
0545: public void receive(Message msg) {
0546: Request req = null;
0547:
0548: if (msg == null || msg.getLength() == 0)
0549: return;
0550: try {
0551: req = (Request) msg.getObject();
0552: request_queue.add(req);
0553: } catch (QueueClosedException queue_closed_ex) {
0554: if (log.isErrorEnabled())
0555: log.error("request queue is null");
0556: } catch (Exception ex) {
0557: if (log.isErrorEnabled())
0558: log.error("failed unmarshalling request: " + ex);
0559: }
0560: }
0561:
0562: /** Return a copy of the current cache (tree) */
0563: public byte[] getState() {
0564: try {
0565: return Util.objectToByteBuffer(root.clone());
0566: } catch (Throwable ex) {
0567: if (log.isErrorEnabled())
0568: log.error("exception returning cache: " + ex);
0569: return null;
0570: }
0571: }
0572:
0573: /** Set the cache (tree) to this value */
0574: public void setState(byte[] new_state) {
0575: Node new_root = null;
0576: Object obj;
0577:
0578: if (new_state == null) {
0579: if (log.isInfoEnabled())
0580: log.info("new cache is null");
0581: return;
0582: }
0583: try {
0584: obj = Util.objectFromByteBuffer(new_state);
0585: new_root = (Node) ((Node) obj).clone();
0586: root = new_root;
0587: notifyAllNodesCreated(root);
0588: } catch (Throwable ex) {
0589: if (log.isErrorEnabled())
0590: log.error("could not set cache: " + ex);
0591: }
0592: }
0593:
0594: /*-------------------- End of MessageListener ----------------------*/
0595:
0596: /*----------------------- MembershipListener ------------------------*/
0597:
0598: public void viewAccepted(View new_view) {
0599: Vector new_mbrs = new_view.getMembers();
0600:
0601: // todo: if MergeView, fetch and reconcile state from coordinator
0602: // actually maybe this is best left up to the application ? we just notify them and let
0603: // the appl handle it ?
0604:
0605: if (new_mbrs != null) {
0606: notifyViewChange(new_view);
0607: members.removeAllElements();
0608: for (int i = 0; i < new_mbrs.size(); i++)
0609: members.addElement(new_mbrs.elementAt(i));
0610: }
0611: //if size is bigger than one, there are more peers in the group
0612: //otherwise there is only one server.
0613: send_message = members.size() > 1;
0614: }
0615:
0616: /** Called when a member is suspected */
0617: public void suspect(Address suspected_mbr) {
0618: ;
0619: }
0620:
0621: /** Block sending and receiving of messages until viewAccepted() is called */
0622: public void block() {
0623: }
0624:
0625: /*------------------- End of MembershipListener ----------------------*/
0626:
0627: /** Request handler thread */
0628: public void run() {
0629: Request req;
0630: String fqn = null;
0631:
0632: while (request_handler != null) {
0633: try {
0634: req = (Request) request_queue.remove(0);
0635: fqn = req.fqn;
0636: switch (req.type) {
0637: case Request.PUT:
0638: if (req.key != null && req.value != null)
0639: _put(fqn, req.key, req.value);
0640: else
0641: _put(fqn, req.data);
0642: break;
0643: case Request.REMOVE:
0644: if (req.key != null)
0645: _remove(fqn, req.key);
0646: else
0647: _remove(fqn);
0648: break;
0649: default:
0650: if (log.isErrorEnabled())
0651: log.error("type " + req.type + " unknown");
0652: break;
0653: }
0654: } catch (QueueClosedException queue_closed_ex) {
0655: request_handler = null;
0656: break;
0657: } catch (Throwable other_ex) {
0658: if (log.isWarnEnabled())
0659: log.warn("exception processing request: "
0660: + other_ex);
0661: }
0662: }
0663: }
0664:
0665: /**
0666: * Find the node just <em>above</em> the one indicated by <code>fqn</code>. This is needed in many cases,
0667: * e.g. to add a new node or remove an existing node.
0668: * @param fqn The fully qualified name of the node.
0669: * @param child_name Will be filled with the name of the child when this method returns. The child name
0670: * is the last relative name of the <code>fqn</code>, e.g. in "/a/b/c" it would be "c".
0671: * @param create_if_not_exists Create parent nodes along the way if they don't exist. Otherwise, this method
0672: * will return when a node cannot be found.
0673: */
0674: Node findParentNode(String fqn, StringHolder child_name,
0675: boolean create_if_not_exists) {
0676: Node curr = root, node;
0677: StringTokenizer tok;
0678: String name;
0679: StringBuffer sb = null;
0680:
0681: if (fqn == null || fqn.equals(SEPARATOR) || "".equals(fqn))
0682: return curr;
0683:
0684: sb = new StringBuffer();
0685: tok = new StringTokenizer(fqn, SEPARATOR);
0686: while (tok.countTokens() > 1) {
0687: name = tok.nextToken();
0688: sb.append(SEPARATOR).append(name);
0689: node = curr.getChild(name);
0690: if (node == null && create_if_not_exists)
0691: node = curr
0692: .createChild(name, sb.toString(), null, null);
0693: if (node == null)
0694: return null;
0695: else
0696: curr = node;
0697: }
0698:
0699: if (tok.countTokens() > 0 && child_name != null)
0700: child_name.setValue(tok.nextToken());
0701: return curr;
0702: }
0703:
0704: /**
0705: * Returns the node at fqn. This method should not be used by clients (therefore it is package-private):
0706: * it is only used internally (for navigation). C++ 'friend' would come in handy here...
0707: * @param fqn The fully qualified name of the node
0708: * @return Node The node at fqn
0709: */
0710: Node findNode(String fqn) {
0711: StringHolder sh = new StringHolder();
0712: Node n = findParentNode(fqn, sh, false);
0713: String child_name = sh.getValue();
0714:
0715: if (fqn == null || fqn.equals(SEPARATOR) || "".equals(fqn))
0716: return root;
0717:
0718: if (n == null || child_name == null)
0719: return null;
0720: else
0721: return n.getChild(child_name);
0722: }
0723:
0724: void notifyNodeAdded(String fqn) {
0725: for (int i = 0; i < listeners.size(); i++)
0726: ((ReplicatedTreeListener) listeners.elementAt(i))
0727: .nodeAdded(fqn);
0728: }
0729:
0730: void notifyNodeRemoved(String fqn) {
0731: for (int i = 0; i < listeners.size(); i++)
0732: ((ReplicatedTreeListener) listeners.elementAt(i))
0733: .nodeRemoved(fqn);
0734: }
0735:
0736: void notifyNodeModified(String fqn) {
0737: for (int i = 0; i < listeners.size(); i++)
0738: ((ReplicatedTreeListener) listeners.elementAt(i))
0739: .nodeModified(fqn);
0740: }
0741:
0742: void notifyViewChange(View v) {
0743: for (int i = 0; i < listeners.size(); i++)
0744: ((ReplicatedTreeListener) listeners.elementAt(i))
0745: .viewChange(v);
0746: }
0747:
0748: /** Generates NodeAdded notifications for all nodes of the tree. This is called whenever the tree is
0749: initially retrieved (state transfer) */
0750: void notifyAllNodesCreated(Node curr) {
0751: Node n;
0752: Map children;
0753:
0754: if (curr == null)
0755: return;
0756: notifyNodeAdded(curr.fqn);
0757: if ((children = curr.getChildren()) != null) {
0758: for (Iterator it = children.values().iterator(); it
0759: .hasNext();) {
0760: n = (Node) it.next();
0761: notifyAllNodesCreated(n);
0762: }
0763: }
0764: }
0765:
0766: public static class Node implements Serializable {
0767: String name = null; // relative name (e.g. "Security")
0768: String fqn = null; // fully qualified name (e.g. "/federations/fed1/servers/Security")
0769: Node parent = null; // parent node
0770: TreeMap children = null; // keys: child name, value: Node
0771: HashMap data = null; // data for current node
0772: private static final long serialVersionUID = -3077676554440038890L;
0773:
0774: // Address creator=null; // member that created this node (needed ?)
0775:
0776: private Node(String child_name, String fqn, Node parent,
0777: HashMap data) {
0778: name = child_name;
0779: this .fqn = fqn;
0780: this .parent = parent;
0781: if (data != null)
0782: this .data = (HashMap) data.clone();
0783: }
0784:
0785: private Node(String child_name, String fqn, Node parent,
0786: String key, Object value) {
0787: name = child_name;
0788: this .fqn = fqn;
0789: this .parent = parent;
0790: if (data == null)
0791: data = new HashMap();
0792: data.put(key, value);
0793: }
0794:
0795: void setData(Map data) {
0796: if (data == null)
0797: return;
0798: if (this .data == null)
0799: this .data = new HashMap();
0800: this .data.putAll(data);
0801: }
0802:
0803: void setData(String key, Object value) {
0804: if (this .data == null)
0805: this .data = new HashMap();
0806: this .data.put(key, value);
0807: }
0808:
0809: HashMap getData() {
0810: return data;
0811: }
0812:
0813: Object getData(String key) {
0814: return data != null ? data.get(key) : null;
0815: }
0816:
0817: boolean childExists(String child_name) {
0818: if (child_name == null)
0819: return false;
0820: return children != null && children.containsKey(child_name);
0821: }
0822:
0823: Node createChild(String child_name, String fqn, Node parent,
0824: HashMap data) {
0825: Node child = null;
0826:
0827: if (child_name == null)
0828: return null;
0829: if (children == null)
0830: children = new TreeMap();
0831: child = (Node) children.get(child_name);
0832: if (child != null)
0833: child.setData(data);
0834: else {
0835: child = new Node(child_name, fqn, parent, data);
0836: children.put(child_name, child);
0837: }
0838: return child;
0839: }
0840:
0841: Node createChild(String child_name, String fqn, Node parent,
0842: String key, Object value) {
0843: Node child = null;
0844:
0845: if (child_name == null)
0846: return null;
0847: if (children == null)
0848: children = new TreeMap();
0849: child = (Node) children.get(child_name);
0850: if (child != null)
0851: child.setData(key, value);
0852: else {
0853: child = new Node(child_name, fqn, parent, key, value);
0854: children.put(child_name, child);
0855: }
0856: return child;
0857: }
0858:
0859: Node getChild(String child_name) {
0860: return child_name == null ? null : children == null ? null
0861: : (Node) children.get(child_name);
0862: }
0863:
0864: Map getChildren() {
0865: return children;
0866: }
0867:
0868: void removeData(String key) {
0869: if (data != null)
0870: data.remove(key);
0871: }
0872:
0873: void removeData() {
0874: if (data != null)
0875: data.clear();
0876: }
0877:
0878: void removeChild(String child_name, String fqn) {
0879: if (child_name != null && children != null
0880: && children.containsKey(child_name)) {
0881: children.remove(child_name);
0882: }
0883: }
0884:
0885: void removeAll() {
0886: if (children != null)
0887: children.clear();
0888: }
0889:
0890: void print(StringBuffer sb, int indent) {
0891: printIndent(sb, indent);
0892: sb.append(SEPARATOR).append(name);
0893: if (children != null && children.size() > 0) {
0894: Collection values = children.values();
0895: for (Iterator it = values.iterator(); it.hasNext();) {
0896: sb.append('\n');
0897: ((Node) it.next()).print(sb, indent + INDENT);
0898: }
0899: }
0900: }
0901:
0902: void printIndent(StringBuffer sb, int indent) {
0903: if (sb != null) {
0904: for (int i = 0; i < indent; i++)
0905: sb.append(' ');
0906: }
0907: }
0908:
0909: public String toString() {
0910: StringBuffer sb = new StringBuffer();
0911: if (name != null)
0912: sb.append("\nname=" + name);
0913: if (fqn != null)
0914: sb.append("\nfqn=" + fqn);
0915: if (data != null)
0916: sb.append("\ndata=" + data);
0917: return sb.toString();
0918: }
0919:
0920: public Object clone() throws CloneNotSupportedException {
0921: Node n = new Node(name, fqn, parent != null ? (Node) parent
0922: .clone() : null, data);
0923: if (children != null)
0924: n.children = (TreeMap) children.clone();
0925: return n;
0926: }
0927:
0928: }
0929:
0930: private static class StringHolder {
0931: String s = null;
0932:
0933: private StringHolder() {
0934: }
0935:
0936: void setValue(String s) {
0937: this .s = s;
0938: }
0939:
0940: String getValue() {
0941: return s;
0942: }
0943: }
0944:
0945: /**
0946: * Class used to multicast add(), remove() and set() methods to all members.
0947: */
0948: private static class Request implements Serializable {
0949: static final int PUT = 1;
0950: static final int REMOVE = 2;
0951:
0952: int type = 0;
0953: String fqn = null;
0954: String key = null;
0955: Object value = null;
0956: HashMap data = null;
0957: private static final long serialVersionUID = 7772753222127676782L;
0958:
0959: private Request(int type, String fqn) {
0960: this .type = type;
0961: this .fqn = fqn;
0962: }
0963:
0964: private Request(int type, String fqn, HashMap data) {
0965: this (type, fqn);
0966: this .data = data;
0967: }
0968:
0969: private Request(int type, String fqn, String key) {
0970: this (type, fqn);
0971: this .key = key;
0972: }
0973:
0974: private Request(int type, String fqn, String key, Object value) {
0975: this (type, fqn);
0976: this .key = key;
0977: this .value = value;
0978: }
0979:
0980: public String toString() {
0981: StringBuffer sb = new StringBuffer();
0982: sb.append(type2String(type)).append(" (");
0983: if (fqn != null)
0984: sb.append(" fqn=" + fqn);
0985: switch (type) {
0986: case PUT:
0987: if (data != null)
0988: sb.append(", data=" + data);
0989: if (key != null)
0990: sb.append(", key=" + key);
0991: if (value != null)
0992: sb.append(", value=" + value);
0993: break;
0994: case REMOVE:
0995: if (key != null)
0996: sb.append(", key=" + key);
0997: break;
0998: default:
0999: break;
1000: }
1001: sb.append(')');
1002: return sb.toString();
1003: }
1004:
1005: static String type2String(int t) {
1006: switch (t) {
1007: case PUT:
1008: return "PUT";
1009: case REMOVE:
1010: return "REMOVE";
1011: default:
1012: return "UNKNOWN";
1013: }
1014: }
1015:
1016: }
1017:
1018: public static void main(String[] args) {
1019:
1020: ReplicatedTree tree = null;
1021: HashMap m = new HashMap();
1022: String props;
1023:
1024: props = "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;"
1025: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
1026: + "PING(timeout=2000;num_initial_members=3):"
1027: + "MERGE2(min_interval=5000;max_interval=10000):"
1028: + "FD_SOCK:"
1029: + "VERIFY_SUSPECT(timeout=1500):"
1030: + "pbcast.STABLE(desired_avg_gossip=20000):"
1031: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):"
1032: + "UNICAST(timeout=5000):"
1033: + "FRAG(frag_size=16000;down_thread=false;up_thread=false):"
1034: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
1035: + "shun=false;print_local_addr=true):"
1036: + "pbcast.STATE_TRANSFER";
1037: // "PERF(details=true)";
1038:
1039: try {
1040:
1041: tree = new ReplicatedTree(null, props, 10000);
1042: // tree.setRemoteCalls(false);
1043: tree.addReplicatedTreeListener(new MyListener());
1044: tree.put("/a/b/c", null);
1045: tree.put("/a/b/c1", null);
1046: tree.put("/a/b/c2", null);
1047: tree.put("/a/b1/chat", null);
1048: tree.put("/a/b1/chat2", null);
1049: tree.put("/a/b1/chat5", null);
1050: System.out.println(tree);
1051: m.put("name", "Bela Ban");
1052: m.put("age", new Integer(36));
1053: m.put("cube", "240-17");
1054: tree.put("/a/b/c", m);
1055: System.out.println("info for for \"/a/b/c\" is "
1056: + tree.print("/a/b/c"));
1057: tree.put("/a/b/c", "age", new Integer(37));
1058: System.out.println("info for for \"/a/b/c\" is "
1059: + tree.print("/a/b/c"));
1060: tree.remove("/a/b");
1061: System.out.println(tree);
1062: } catch (Exception ex) {
1063: System.err.println(ex);
1064: }
1065: }
1066:
1067: static class MyListener implements ReplicatedTreeListener {
1068:
1069: public void nodeAdded(String fqn) {
1070: System.out.println("** node added: " + fqn);
1071: }
1072:
1073: public void nodeRemoved(String fqn) {
1074: System.out.println("** node removed: " + fqn);
1075: }
1076:
1077: public void nodeModified(String fqn) {
1078: System.out.println("** node modified: " + fqn);
1079: }
1080:
1081: public void viewChange(View new_view) {
1082: System.out.println("** view change: " + new_view);
1083: }
1084:
1085: }
1086:
1087: }
|