001: // $Id: QuoteServer.java,v 1.10 2006/05/03 08:20:15 belaban Exp $
002:
003: package org.jgroups.demos;
004:
005: import org.jgroups.*;
006: import org.jgroups.blocks.RpcDispatcher;
007: import org.jgroups.util.Util;
008: import org.apache.commons.logging.Log;
009: import org.apache.commons.logging.LogFactory;
010:
011: import java.util.Date;
012: import java.util.Enumeration;
013: import java.util.Hashtable;
014:
015: /**
016: * Example of a replicated quote server. The server maintains state which consists of a list
017: * of quotes and their corresponding values. When it is started, it tries to reach other
018: * quote servers to get its initial state. If it does not receive any response after 5
019: * seconds, it assumes it is the first server and starts processing requests. When it
020: * receives a view notification it checks whether there are more members in the view than in
021: * its previous view. If this is the case, it broadcasts a request for the state to all
022: * members. Integration of different states is simply the union of all states (with the
023: * danger of overwriting mutually inconsistent state).<p> This mechanism allows both for
024: * fast discovery of initial servers, and - in the case of partition merging - for
025: * reintegration of existing servers. Broadcasting the state request upon every view change
026: * (only when new members are joined) causes potentially a lot of network traffic, but it is
027: * assumes that there will not be more than 5 quote servers at most.
028: * @author Bela Ban
029: */
030:
031: public class QuoteServer implements MembershipListener, MessageListener {
032: final Hashtable stocks = new Hashtable();
033: Channel channel;
034: RpcDispatcher disp;
035: static final String channel_name = "Quotes";
036: final int num_members = 1;
037: Log log = LogFactory.getLog(getClass());
038:
039: final String props = null; // default stack from JChannel
040:
041: private void integrate(Hashtable state) {
042: String key;
043: if (state == null)
044: return;
045: for (Enumeration e = state.keys(); e.hasMoreElements();) {
046: key = (String) e.nextElement();
047: stocks.put(key, state.get(key)); // just overwrite
048: }
049: }
050:
051: public void viewAccepted(View new_view) {
052: System.out.println("Accepted view (" + new_view.size()
053: + new_view.getMembers() + ')');
054: }
055:
056: public void suspect(Address suspected_mbr) {
057: }
058:
059: public void block() {
060: }
061:
062: public void start() {
063: try {
064: channel = new JChannel(props);
065: disp = new RpcDispatcher(channel, this , this , this );
066: channel.connect(channel_name);
067: System.out.println("\nQuote Server started at "
068: + new Date());
069: System.out.println("Joined channel '" + channel_name
070: + "' (" + channel.getView().size() + " members)");
071: channel.getState(null, 0);
072: System.out.println("Ready to serve requests");
073: } catch (Exception e) {
074: log.error("QuoteServer.start() : " + e);
075: System.exit(-1);
076: }
077: }
078:
079: /* Quote methods: */
080:
081: public float getQuote(String stock_name) throws Exception {
082: System.out.print("Getting quote for " + stock_name + ": ");
083: Float retval = (Float) stocks.get(stock_name);
084: if (retval == null) {
085: System.out.println("not found");
086: throw new Exception("Stock " + stock_name + " not found");
087: }
088: System.out.println(retval.floatValue());
089: return retval.floatValue();
090: }
091:
092: public void setQuote(String stock_name, Float value) {
093: System.out.println("Setting quote for " + stock_name + ": "
094: + value);
095: stocks.put(stock_name, value);
096: }
097:
098: public Hashtable getAllStocks() {
099: System.out.print("getAllStocks: ");
100: printAllStocks();
101: return stocks;
102: }
103:
104: public void printAllStocks() {
105: System.out.println(stocks);
106: }
107:
108: public void receive(Message msg) {
109: }
110:
111: public byte[] getState() {
112: try {
113: return Util.objectToByteBuffer(stocks.clone());
114: } catch (Exception ex) {
115: ex.printStackTrace();
116: return null;
117: }
118: }
119:
120: public void setState(byte[] state) {
121: try {
122: integrate((Hashtable) Util.objectFromByteBuffer(state));
123: } catch (Exception ex) {
124: ex.printStackTrace();
125: }
126: }
127:
128: public static void main(String args[]) {
129: try {
130: QuoteServer server = new QuoteServer();
131: server.start();
132: while (true) {
133: Util.sleep(10000);
134: }
135: } catch (Throwable t) {
136: t.printStackTrace();
137: }
138: }
139:
140: }
|