001: package org.jgroups.tests.perf.transports;
002:
003: import org.jgroups.Address;
004: import org.jgroups.JChannel;
005: import org.jgroups.Message;
006: import org.jgroups.jmx.JmxConfigurator;
007: import org.jgroups.tests.perf.Receiver;
008: import org.jgroups.tests.perf.Transport;
009: import org.jgroups.util.Util;
010:
011: import javax.management.MBeanServer;
012: import java.util.Map;
013: import java.util.Properties;
014:
015: /**
016: * @author Bela Ban Jan 22
017: * @author 2004
018: * @version $Id: JGroupsTransport.java,v 1.14 2006/07/31 09:21:59 belaban Exp $
019: */
020: public class JGroupsTransport extends org.jgroups.ReceiverAdapter
021: implements Transport {
022: Properties config = null;
023: JChannel channel = null;
024: Thread t = null;
025: String props = null;
026: String group_name = "perf";
027: Receiver receiver = null;
028: boolean jmx = false;
029:
030: public JGroupsTransport() {
031:
032: }
033:
034: public Object getLocalAddress() {
035: return channel != null ? channel.getLocalAddress() : null;
036: }
037:
038: public void create(Properties properties) throws Exception {
039: this .config = properties;
040: props = config.getProperty("props");
041: jmx = new Boolean(this .config.getProperty("jmx"))
042: .booleanValue();
043: channel = new JChannel(props);
044: channel.setReceiver(this );
045: }
046:
047: public void start() throws Exception {
048: channel.connect(group_name);
049: if (jmx) {
050: MBeanServer server = Util.getMBeanServer();
051: if (server == null) {
052: throw new Exception(
053: "No MBeanServers found;"
054: + "\nneeds to be run with an MBeanServer present, or inside JDK 5");
055: }
056: JmxConfigurator.registerChannel(channel, server,
057: "jgroups.perf", channel.getClusterName(), true);
058: }
059: }
060:
061: public void stop() {
062: if (channel != null) {
063: channel.shutdown(); // will cause thread to terminate anyways
064: }
065: t = null;
066: }
067:
068: public void destroy() {
069: if (channel != null) {
070: channel.close();
071: channel = null;
072: }
073: }
074:
075: public void setReceiver(Receiver r) {
076: this .receiver = r;
077: }
078:
079: public Map dumpStats() {
080: return channel != null ? channel.dumpStats() : null;
081: }
082:
083: public void send(Object destination, byte[] payload)
084: throws Exception {
085: Message msg = new Message((Address) destination, null, payload);
086: if (channel != null)
087: channel.send(msg);
088: }
089:
090: public void receive(Message msg) {
091: Address sender = msg.getSrc();
092: byte[] payload = msg.getBuffer();
093: if (receiver != null) {
094: try {
095: receiver.receive(sender, payload);
096: } catch (Throwable tt) {
097: tt.printStackTrace();
098: }
099: }
100: }
101:
102: }
|