001: package org.jgroups.tests.perf.transports;
002:
003: import org.jgroups.tests.perf.Receiver;
004: import org.jgroups.tests.perf.Transport;
005:
006: import javax.jms.*;
007: import javax.naming.InitialContext;
008: import java.util.Properties;
009: import java.util.Map;
010:
011: /**
012: * @author Bela Ban Jan 22
013: * @author 2004
014: * @version $Id: JmsTransport.java,v 1.7 2006/04/25 11:55:18 belaban Exp $
015: */
016: public class JmsTransport implements Transport, MessageListener {
017: Receiver receiver = null;
018: Properties config = null;
019: Object local_addr = null;
020: ConnectionFactory factory;
021: InitialContext ctx;
022: TopicConnection conn;
023: TopicSession session;
024: TopicPublisher pub;
025: TopicSubscriber sub;
026: Topic topic;
027: String topic_name = "topic/testTopic";
028:
029: public JmsTransport() {
030: }
031:
032: public Object getLocalAddress() {
033: return local_addr;
034: }
035:
036: public void create(Properties properties) throws Exception {
037: this .config = properties;
038:
039: String tmp = config.getProperty("topic");
040: if (tmp != null)
041: topic_name = tmp;
042:
043: ctx = new InitialContext();
044: factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
045:
046: // local_addr=new IpAddress(ucast_sock.getLocalAddress(), ucast_sock.getLocalPort());
047:
048: }
049:
050: public void start() throws Exception {
051: conn = ((TopicConnectionFactory) factory)
052: .createTopicConnection();
053: session = conn.createTopicSession(false,
054: Session.AUTO_ACKNOWLEDGE);
055: topic = (Topic) ctx.lookup(topic_name);
056: pub = session.createPublisher(topic);
057: sub = session.createSubscriber(topic);
058: sub.setMessageListener(this );
059: conn.start();
060: this .local_addr = conn.getClientID();
061: System.out.println("-- local_addr is " + local_addr);
062: }
063:
064: public void stop() {
065: try {
066: conn.stop();
067: } catch (JMSException e) {
068: e.printStackTrace();
069: }
070: }
071:
072: public void destroy() {
073: }
074:
075: public void setReceiver(Receiver r) {
076: this .receiver = r;
077: }
078:
079: public Map dumpStats() {
080: return null;
081: }
082:
083: public void send(Object destination, byte[] payload)
084: throws Exception {
085: if (destination != null)
086: throw new Exception(
087: "JmsTransport.send(): unicast destination is not supported");
088:
089: ObjectMessage msg = session.createObjectMessage(payload);
090: msg.setObjectProperty("sender", local_addr);
091: // msg.setObjectProperty("size", new Integer(payload.length));
092:
093: //todo: write the sender (maybe use ObjectMessage instead of BytesMessage)
094:
095: // msg.writeInt(payload.length);
096: // msg.writeBytes(payload, 0, payload.length);
097: pub.publish(topic, msg);
098: }
099:
100: public void onMessage(Message message) {
101: Object sender = null;
102: if (message == null || !(message instanceof ObjectMessage)) {
103: System.err
104: .println("JmsTransport.onMessage(): received a non ObjectMessage ("
105: + message + "), discarding");
106: return;
107: }
108: ObjectMessage tmp = (ObjectMessage) message;
109: try {
110:
111: // todo: read the sender
112: sender = tmp.getObjectProperty("sender");
113:
114: // int len=tmp.readInt();
115: // int len=((Integer)tmp.getObjectProperty("size")).intValue();
116:
117: byte[] payload = (byte[]) tmp.getObject();
118: if (receiver != null)
119: receiver.receive(sender, payload);
120: } catch (JMSException e) {
121: e.printStackTrace();
122: }
123:
124: }
125:
126: }
|