001: package com.mockrunner.example.jms;
002:
003: import java.util.HashMap;
004: import java.util.Iterator;
005: import java.util.Map;
006:
007: import javax.jms.JMSException;
008: import javax.jms.MapMessage;
009: import javax.jms.Session;
010: import javax.jms.TextMessage;
011: import javax.jms.Topic;
012: import javax.jms.TopicConnection;
013: import javax.jms.TopicConnectionFactory;
014: import javax.jms.TopicPublisher;
015: import javax.jms.TopicSession;
016: import javax.naming.InitialContext;
017:
018: /**
019: * This example class sends multiple messages
020: * to a topic within a transaction. The first message
021: * is a <code>TextMessage</code> with the current
022: * timestamp. The following two messages refer to
023: * the first message with this timestamp as
024: * correlation id. They contain the market rates for
025: * some stocks in the form of maps (the company name
026: * maps to the market rate).
027: */
028: public class StockQuotePublisher {
029: private Map nasdaqRates = new HashMap();
030: private Map dowRates = new HashMap();
031:
032: public void setQuotes(Map nasdaqRates, Map dowRates) {
033: this .nasdaqRates.clear();
034: this .dowRates.clear();
035: this .nasdaqRates.putAll(nasdaqRates);
036: this .dowRates.putAll(dowRates);
037: }
038:
039: public void send() {
040: TopicConnection topicConnection = null;
041: TopicSession topicSession = null;
042: TopicPublisher topicPublisher = null;
043: try {
044: InitialContext initialContext = new InitialContext();
045: TopicConnectionFactory topicFactory = (TopicConnectionFactory) initialContext
046: .lookup("java:/ConnectionFactory");
047: topicConnection = topicFactory.createTopicConnection();
048: topicSession = topicConnection.createTopicSession(true,
049: Session.AUTO_ACKNOWLEDGE);
050: Topic topic = (Topic) initialContext
051: .lookup("topic/quoteTopic");
052: TextMessage timeMessage = createInitialTimestampMessage(topicSession);
053: MapMessage nasdaqMessage = createStockQuoteMessage(
054: topicSession, nasdaqRates);
055: MapMessage dowMessage = createStockQuoteMessage(
056: topicSession, dowRates);
057: nasdaqMessage.setJMSCorrelationID(timeMessage.getText());
058: dowMessage.setJMSCorrelationID(timeMessage.getText());
059: topicPublisher = topicSession.createPublisher(topic);
060: topicPublisher.publish(timeMessage);
061: topicPublisher.publish(nasdaqMessage);
062: topicPublisher.publish(dowMessage);
063: topicSession.commit();
064: } catch (Exception exc) {
065: try {
066: if (null != topicSession)
067: topicSession.rollback();
068: } catch (JMSException jmsExc) {
069: jmsExc.printStackTrace();
070: }
071: } finally {
072: try {
073: if (null != topicPublisher)
074: topicPublisher.close();
075: if (null != topicSession)
076: topicSession.close();
077: if (null != topicConnection)
078: topicConnection.close();
079: } catch (JMSException exc) {
080: exc.printStackTrace();
081: }
082: }
083: }
084:
085: private TextMessage createInitialTimestampMessage(
086: TopicSession topicSession) throws JMSException {
087: TextMessage message = topicSession.createTextMessage();
088: message.setText(String.valueOf(System.currentTimeMillis()));
089: return message;
090: }
091:
092: private MapMessage createStockQuoteMessage(
093: TopicSession topicSession, Map rates) throws JMSException {
094: MapMessage message = topicSession.createMapMessage();
095: Iterator keys = rates.keySet().iterator();
096: while (keys.hasNext()) {
097: String nextKey = (String) keys.next();
098: message.setString(nextKey, (String) rates.get(nextKey));
099: }
100: return message;
101: }
102: }
|