001: // $Id: DistributedTree.java,v 1.15 2006/03/27 08:34:24 belaban Exp $
002:
003: package org.jgroups.blocks;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.*;
008: import org.jgroups.util.Util;
009:
010: import java.io.Serializable;
011: import java.util.StringTokenizer;
012: import java.util.Vector;
013:
014: /**
015: * A tree-like structure that is replicated across several members. Updates will be multicast to all group
016: * members reliably and in the same order.
017: * @author Bela Ban
018: * @author <a href="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a>
019: */
020: public class DistributedTree implements MessageListener,
021: MembershipListener {
022: Node root = null;
023: final Vector listeners = new Vector();
024: final Vector view_listeners = new Vector();
025: final Vector members = new Vector();
026: protected Channel channel = null;
027: protected RpcDispatcher disp = null;
028: String groupname = "DistributedTreeGroup";
029: String channel_properties = "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=0):"
030: + "PING(timeout=5000;num_initial_members=6):"
031: + "FD_SOCK:"
032: + "VERIFY_SUSPECT(timeout=1500):"
033: + "pbcast.STABLE(desired_avg_gossip=10000):"
034: + "pbcast.NAKACK(gc_lag=5;retransmit_timeout=3000;trace=true):"
035: + "UNICAST(timeout=5000):"
036: + "FRAG(down_thread=false;up_thread=false):"
037: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
038: + "shun=false;print_local_addr=true):"
039: + "pbcast.STATE_TRANSFER(trace=true)";
040: final long state_timeout = 5000; // wait 5 secs max to obtain state
041:
042: /** Determines when the updates have to be sent across the network, avoids sending unnecessary
043: * messages when there are no member in the group */
044: private boolean send_message = false;
045:
046: protected static final Log log = LogFactory
047: .getLog(DistributedTree.class);
048:
049: public interface DistributedTreeListener {
050: void nodeAdded(String fqn, Serializable element);
051:
052: void nodeRemoved(String fqn);
053:
054: void nodeModified(String fqn, Serializable old_element,
055: Serializable new_element);
056: }
057:
058: public interface ViewListener {
059: void viewChange(Vector new_mbrs, Vector old_mbrs);
060: }
061:
062: public DistributedTree() {
063: }
064:
065: public DistributedTree(String groupname, String channel_properties) {
066: this .groupname = groupname;
067: if (channel_properties != null)
068: this .channel_properties = channel_properties;
069: }
070:
071: /*
072: * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be
073: * used to register under that id. This is typically used when another building block is already using
074: * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
075: * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
076: * first block created on PullPushAdapter.
077: * @param adapter The PullPushAdapter which to use as underlying transport
078: * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
079: * requests/responses for different building blocks on top of PullPushAdapter.
080: * @param state_timeout Max number of milliseconds to wait until state is
081: * retrieved
082: */
083: public DistributedTree(PullPushAdapter adapter, Serializable id,
084: long state_timeout) throws ChannelException {
085: channel = (Channel) adapter.getTransport();
086: disp = new RpcDispatcher(adapter, id, this , this , this );
087: boolean rc = channel.getState(null, state_timeout);
088: if (rc) {
089: if (log.isInfoEnabled())
090: log.info("state was retrieved successfully");
091: } else if (log.isInfoEnabled())
092: log
093: .info("state could not be retrieved (must be first member in group)");
094: }
095:
096: public Object getLocalAddress() {
097: return channel != null ? channel.getLocalAddress() : null;
098: }
099:
100: public void setDeadlockDetection(boolean flag) {
101: if (disp != null)
102: disp.setDeadlockDetection(flag);
103: }
104:
105: public void start() throws Exception {
106: start(8000);
107: }
108:
109: public void start(long timeout) throws Exception {
110: if (channel != null) // already started
111: return;
112: channel = new JChannel(channel_properties);
113: disp = new RpcDispatcher(channel, this , this , this );
114: channel.connect(groupname);
115: boolean rc = channel.getState(null, timeout);
116: if (rc) {
117: if (log.isInfoEnabled())
118: log.info("state was retrieved successfully");
119: } else if (log.isInfoEnabled())
120: log
121: .info("state could not be retrieved (must be first member in group)");
122: }
123:
124: public void stop() {
125: if (channel != null) {
126: channel.close();
127: disp.stop();
128: }
129: channel = null;
130: disp = null;
131: }
132:
133: public void addDistributedTreeListener(
134: DistributedTreeListener listener) {
135: if (!listeners.contains(listener))
136: listeners.addElement(listener);
137: }
138:
139: public void removeDistributedTreeListener(
140: DistributedTreeListener listener) {
141: listeners.removeElement(listener);
142: }
143:
144: public void addViewListener(ViewListener listener) {
145: if (!view_listeners.contains(listener))
146: view_listeners.addElement(listener);
147: }
148:
149: public void removeViewListener(ViewListener listener) {
150: view_listeners.removeElement(listener);
151: }
152:
153: public void add(String fqn) {
154: //Changes done by <aos>
155: //if true, propagate action to the group
156: if (send_message == true) {
157: try {
158: MethodCall call = new MethodCall("_add",
159: new Object[] { fqn },
160: new String[] { String.class.getName() });
161: disp.callRemoteMethods(null, call,
162: GroupRequest.GET_ALL, 0);
163: } catch (Exception ex) {
164: if (log.isErrorEnabled())
165: log.error("exception=" + ex);
166: }
167: } else {
168: _add(fqn);
169: }
170: }
171:
172: public void add(String fqn, Serializable element) {
173: add(fqn, element, 0);
174: }
175:
176: /** resets an existing node, useful after a merge when you want to tell other
177: * members of your state, but do not wish to remove and then add as two separate calls */
178: public void reset(String fqn, Serializable element) {
179: reset(fqn, element, 0);
180: }
181:
182: public void remove(String fqn) {
183: remove(fqn, 0);
184: }
185:
186: public void add(String fqn, Serializable element, int timeout) {
187: //Changes done by <aos>
188: //if true, propagate action to the group
189: if (send_message == true) {
190: try {
191: MethodCall call = new MethodCall("_add", new Object[] {
192: fqn, element }, new String[] {
193: String.class.getName(),
194: Serializable.class.getName() });
195: disp.callRemoteMethods(null, call,
196: GroupRequest.GET_ALL, timeout);
197: } catch (Exception ex) {
198: if (log.isErrorEnabled())
199: log.error("exception=" + ex);
200: }
201: } else {
202: _add(fqn, element);
203: }
204: }
205:
206: /** resets an existing node, useful after a merge when you want to tell other
207: * members of your state, but do not wish to remove and then add as two separate calls */
208: public void reset(String fqn, Serializable element, int timeout) {
209: //Changes done by <aos>
210: //if true, propagate action to the group
211: if (send_message == true) {
212: try {
213: MethodCall call = new MethodCall("_reset",
214: new Object[] { fqn, element }, new String[] {
215: String.class.getName(),
216: Serializable.class.getName() });
217: disp.callRemoteMethods(null, call,
218: GroupRequest.GET_ALL, timeout);
219: } catch (Exception ex) {
220: if (log.isErrorEnabled())
221: log.error("exception=" + ex);
222: }
223: } else {
224: _add(fqn, element);
225: }
226: }
227:
228: public void remove(String fqn, int timeout) {
229: //Changes done by <aos>
230: //if true, propagate action to the group
231: if (send_message == true) {
232: try {
233: MethodCall call = new MethodCall("_remove",
234: new Object[] { fqn },
235: new String[] { String.class.getName() });
236: disp.callRemoteMethods(null, call,
237: GroupRequest.GET_ALL, timeout);
238: } catch (Exception ex) {
239: if (log.isErrorEnabled())
240: log.error("exception=" + ex);
241: }
242: } else {
243: _remove(fqn);
244: }
245: }
246:
247: public boolean exists(String fqn) {
248: if (fqn == null)
249: return false;
250: return findNode(fqn) == null ? false : true;
251: }
252:
253: public Serializable get(String fqn) {
254: Node n = null;
255:
256: if (fqn == null)
257: return null;
258: n = findNode(fqn);
259: if (n != null) {
260: return n.element;
261: }
262: return null;
263: }
264:
265: public void set(String fqn, Serializable element) {
266: set(fqn, element, 0);
267: }
268:
269: public void set(String fqn, Serializable element, int timeout) {
270: //Changes done by <aos>
271: //if true, propagate action to the group
272: if (send_message == true) {
273: try {
274: MethodCall call = new MethodCall("_set", new Object[] {
275: fqn, element }, new String[] {
276: String.class.getName(),
277: Serializable.class.getName() });
278: disp.callRemoteMethods(null, call,
279: GroupRequest.GET_ALL, timeout);
280: } catch (Exception ex) {
281: if (log.isErrorEnabled())
282: log.error("exception=" + ex);
283: }
284: } else {
285: _set(fqn, element);
286: }
287: }
288:
289: /** Returns all children of a Node as strings */
290: public Vector getChildrenNames(String fqn) {
291: Vector ret = new Vector();
292: Node n;
293:
294: if (fqn == null)
295: return ret;
296: n = findNode(fqn);
297: if (n == null || n.children == null)
298: return ret;
299: for (int i = 0; i < n.children.size(); i++)
300: ret.addElement(((Node) n.children.elementAt(i)).name);
301: return ret;
302: }
303:
304: public String print() {
305: StringBuffer sb = new StringBuffer();
306: int indent = 0;
307:
308: if (root == null)
309: return "/";
310:
311: sb.append(root.print(indent));
312: return sb.toString();
313: }
314:
315: /** Returns all children of a Node as Nodes */
316: Vector getChildren(String fqn) {
317: Node n;
318:
319: if (fqn == null)
320: return null;
321: n = findNode(fqn);
322: if (n == null)
323: return null;
324: return n.children;
325: }
326:
327: /**
328: * Returns the name of the group that the DistributedTree is connected to
329: * @return String
330: */
331: public String getGroupName() {
332: return groupname;
333: }
334:
335: /**
336: * Returns the Channel the DistributedTree is connected to
337: * @return Channel
338: */
339: public Channel getChannel() {
340: return channel;
341: }
342:
343: /**
344: * Returns the number of current members joined to the group
345: * @return int
346: */
347: public int getGroupMembersNumber() {
348: return members.size();
349: }
350:
351: /*--------------------- Callbacks --------------------------*/
352:
353: public void _add(String fqn) {
354: _add(fqn, null);
355: }
356:
357: public void _add(String fqn, Serializable element) {
358: Node curr, n;
359: StringTokenizer tok;
360: String child_name;
361: String tmp_fqn = "";
362:
363: if (root == null) {
364: root = new Node("/", null);
365: notifyNodeAdded("/", null);
366: }
367: if (fqn == null)
368: return;
369: curr = root;
370: tok = new StringTokenizer(fqn, "/");
371:
372: while (tok.hasMoreTokens()) {
373: child_name = tok.nextToken();
374: tmp_fqn = tmp_fqn + '/' + child_name;
375: n = curr.findChild(child_name);
376: if (n == null) {
377: n = new Node(child_name, null);
378: curr.addChild(n);
379: if (!tok.hasMoreTokens()) {
380: n.element = element;
381: notifyNodeAdded(tmp_fqn, element);
382: return;
383: } else
384: notifyNodeAdded(tmp_fqn, null);
385: }
386: curr = n;
387: }
388: curr.element = element;
389: notifyNodeModified(fqn, null, element);
390: }
391:
392: public void _remove(String fqn) {
393: Node curr, n;
394: StringTokenizer tok;
395: String child_name = null;
396:
397: if (fqn == null || root == null)
398: return;
399: curr = root;
400: tok = new StringTokenizer(fqn, "/");
401:
402: while (tok.countTokens() > 1) {
403: child_name = tok.nextToken();
404: n = curr.findChild(child_name);
405: if (n == null) // node does not exist
406: return;
407: curr = n;
408: }
409: try {
410: child_name = tok.nextToken();
411: if (child_name != null) {
412: n = curr.removeChild(child_name);
413: if (n != null)
414: notifyNodeRemoved(fqn);
415: }
416: } catch (Exception ex) {
417: }
418: }
419:
420: public void _set(String fqn, Serializable element) {
421: Node n;
422: Serializable old_el = null;
423:
424: if (fqn == null || element == null)
425: return;
426: n = findNode(fqn);
427: if (n == null) {
428: if (log.isErrorEnabled())
429: log.error("node " + fqn + " not found");
430: return;
431: }
432: old_el = n.element;
433: n.element = element;
434: notifyNodeModified(fqn, old_el, element);
435: }
436:
437: /** similar to set, but does not error if node does not exist, but rather does an add instead */
438: public void _reset(String fqn, Serializable element) {
439: Node n;
440: Serializable old_el = null;
441:
442: if (fqn == null || element == null)
443: return;
444: n = findNode(fqn);
445: if (n == null) {
446: _add(fqn, element);
447: }
448: old_el = n.element;
449: n.element = element;
450: notifyNodeModified(fqn, old_el, element);
451: }
452:
453: /*----------------- End of Callbacks ----------------------*/
454:
455: /*-------------------- State Exchange ----------------------*/
456:
457: public void receive(Message msg) {
458: }
459:
460: /** Return a copy of the tree */
461: public byte[] getState() {
462: Object copy = root != null ? root.copy() : null;
463: try {
464: return Util.objectToByteBuffer(copy);
465: } catch (Throwable ex) {
466: if (log.isErrorEnabled())
467: log.error("exception marshalling state: " + ex);
468: return null;
469: }
470: }
471:
472: public void setState(byte[] data) {
473: Object new_state;
474:
475: try {
476: new_state = Util.objectFromByteBuffer(data);
477: } catch (Throwable ex) {
478: if (log.isErrorEnabled())
479: log.error("exception unmarshalling state: " + ex);
480: return;
481: }
482: if (new_state == null)
483: return;
484: if (!(new_state instanceof Node)) {
485: if (log.isErrorEnabled())
486: log.error("object is not of type 'Node'");
487: return;
488: }
489: root = ((Node) new_state).copy();
490: }
491:
492: /*------------------- Membership Changes ----------------------*/
493:
494: public void viewAccepted(View new_view) {
495: Vector new_mbrs = new_view.getMembers();
496:
497: if (new_mbrs != null) {
498: sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left)
499: members.removeAllElements();
500: for (int i = 0; i < new_mbrs.size(); i++)
501: members.addElement(new_mbrs.elementAt(i));
502: }
503: //if size is bigger than one, there are more peers in the group
504: //otherwise there is only one server.
505: if (members.size() > 1) {
506: send_message = true;
507: } else {
508: send_message = false;
509: }
510: }
511:
512: /** Called when a member is suspected */
513: public void suspect(Address suspected_mbr) {
514: }
515:
516: /** Block sending and receiving of messages until ViewAccepted is called */
517: public void block() {
518: }
519:
520: void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) {
521: Vector joined, left;
522: Object mbr;
523:
524: if (view_listeners.size() == 0 || old_mbrs == null
525: || new_mbrs == null)
526: return;
527:
528: // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
529: joined = new Vector();
530: for (int i = 0; i < new_mbrs.size(); i++) {
531: mbr = new_mbrs.elementAt(i);
532: if (!old_mbrs.contains(mbr))
533: joined.addElement(mbr);
534: }
535:
536: // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
537: left = new Vector();
538: for (int i = 0; i < old_mbrs.size(); i++) {
539: mbr = old_mbrs.elementAt(i);
540: if (!new_mbrs.contains(mbr))
541: left.addElement(mbr);
542: }
543: notifyViewChange(joined, left);
544: }
545:
546: Node findNode(String fqn) {
547: Node curr = root;
548: StringTokenizer tok;
549: String child_name;
550:
551: if (fqn == null || root == null)
552: return null;
553: if ("/".equals(fqn) || "".equals(fqn))
554: return root;
555:
556: tok = new StringTokenizer(fqn, "/");
557: while (tok.hasMoreTokens()) {
558: child_name = tok.nextToken();
559: curr = curr.findChild(child_name);
560: if (curr == null)
561: return null;
562: }
563: return curr;
564: }
565:
566: void notifyNodeAdded(String fqn, Serializable element) {
567: for (int i = 0; i < listeners.size(); i++)
568: ((DistributedTreeListener) listeners.elementAt(i))
569: .nodeAdded(fqn, element);
570: }
571:
572: void notifyNodeRemoved(String fqn) {
573: for (int i = 0; i < listeners.size(); i++)
574: ((DistributedTreeListener) listeners.elementAt(i))
575: .nodeRemoved(fqn);
576: }
577:
578: void notifyNodeModified(String fqn, Serializable old_element,
579: Serializable new_element) {
580: for (int i = 0; i < listeners.size(); i++)
581: ((DistributedTreeListener) listeners.elementAt(i))
582: .nodeModified(fqn, old_element, new_element);
583: }
584:
585: /** Generates NodeAdded notifications for all nodes of the tree. This is called whenever the tree is
586: initially retrieved (state transfer) */
587: void notifyAllNodesCreated(Node curr, String tmp_fqn) {
588: Node n;
589:
590: if (curr == null)
591: return;
592: if (curr.name == null) {
593: if (log.isErrorEnabled())
594: log.error("curr.name is null");
595: return;
596: }
597:
598: if (curr.children != null) {
599: for (int i = 0; i < curr.children.size(); i++) {
600: n = (Node) curr.children.elementAt(i);
601: System.out.println("*** nodeCreated(): tmp_fqn is "
602: + tmp_fqn);
603: notifyNodeAdded(tmp_fqn, n.element);
604: notifyAllNodesCreated(n, tmp_fqn + '/' + n.name);
605: }
606: }
607: }
608:
609: void notifyViewChange(Vector new_mbrs, Vector old_mbrs) {
610: for (int i = 0; i < view_listeners.size(); i++)
611: ((ViewListener) view_listeners.elementAt(i)).viewChange(
612: new_mbrs, old_mbrs);
613: }
614:
615: private static class Node implements Serializable {
616: String name = null;
617: Vector children = null;
618: Serializable element = null;
619:
620: Node() {
621: }
622:
623: Node(String name, Serializable element) {
624: this .name = name;
625: this .element = element;
626: }
627:
628: void addChild(String relative_name, Serializable element) {
629: if (relative_name == null)
630: return;
631: if (children == null)
632: children = new Vector();
633: else {
634: if (!children.contains(relative_name))
635: children
636: .addElement(new Node(relative_name, element));
637: }
638: }
639:
640: void addChild(Node n) {
641: if (n == null)
642: return;
643: if (children == null)
644: children = new Vector();
645: if (!children.contains(n))
646: children.addElement(n);
647: }
648:
649: Node removeChild(String rel_name) {
650: Node n = findChild(rel_name);
651:
652: if (n != null)
653: children.removeElement(n);
654: return n;
655: }
656:
657: Node findChild(String relative_name) {
658: Node child;
659:
660: if (children == null || relative_name == null)
661: return null;
662: for (int i = 0; i < children.size(); i++) {
663: child = (Node) children.elementAt(i);
664: if (child.name == null) {
665: if (log.isErrorEnabled())
666: log.error("child.name is null for "
667: + relative_name);
668: continue;
669: }
670:
671: if (child.name.equals(relative_name))
672: return child;
673: }
674:
675: return null;
676: }
677:
678: public boolean equals(Object other) {
679: return other != null && ((Node) other).name != null
680: && name != null && name.equals(((Node) other).name);
681: }
682:
683: Node copy() {
684: Node ret = new Node(name, element);
685:
686: if (children != null)
687: ret.children = (Vector) children.clone();
688: return ret;
689: }
690:
691: String print(int indent) {
692: StringBuffer sb = new StringBuffer();
693: boolean is_root = name != null && "/".equals(name);
694:
695: for (int i = 0; i < indent; i++)
696: sb.append(' ');
697: if (!is_root) {
698: if (name == null)
699: sb.append("/<unnamed>");
700: else {
701: sb.append('/' + name);
702: // if(element != null) sb.append(" --> " + element);
703: }
704: }
705: sb.append('\n');
706: if (children != null) {
707: if (is_root)
708: indent = 0;
709: else
710: indent += 4;
711: for (int i = 0; i < children.size(); i++)
712: sb.append(((Node) children.elementAt(i))
713: .print(indent));
714: }
715: return sb.toString();
716: }
717:
718: public String toString() {
719: if (element != null)
720: return "[name: " + name + ", element: " + element + ']';
721: else
722: return "[name: " + name + ']';
723: }
724:
725: }
726:
727: }
|