/*
* @(#)DurableSubscriberExample.java 1.3 02/05/02
*
* Copyright (c) 2000-2002 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The DurableSubscriberExample class demonstrates that a durable subscription
* is active even when the subscriber is not active.
* <p>
* The program contains a DurableSubscriber class, a MultiplePublisher class,
* a main method, and a method that instantiates the classes and calls their
* methods in sequence.
* <p>
* The program begins like any publish/subscribe program: the subscriber starts,
* the publisher publishes some messages, and the subscriber receives them.
* <p>
* At this point the subscriber closes itself. The publisher then publishes
* some messages while the subscriber is not active. The subscriber then
* restarts and receives the messages.
* <p>
* Specify a topic name on the command line when you run the program.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class DurableSubscriberExample {
String topicName = null;
int exitResult = 0;
static int startindex = 0;
/**
* The DurableSubscriber class contains a constructor, a startSubscriber
* method, a closeSubscriber method, and a finish method.
* <p>
* The class fetches messages asynchronously, using a message listener,
* TextListener.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class DurableSubscriber {
Connection connection = null;
Session session = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
TextListener topicListener = null;
/**
* The TextListener class implements the MessageListener interface by
* defining an onMessage method for the DurableSubscriber class.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
private class TextListener implements MessageListener {
final SampleUtilities.DoneLatch monitor =
new SampleUtilities.DoneLatch();
/**
* Casts the message to a TextMessage and displays its text.
* A non-text message is interpreted as the end of the message
* stream, and the message listener sets its monitor state to all
* done processing messages.
*
* @param message the incoming message
*/
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("SUBSCRIBER: Reading message: "
+ msg.getText());
} catch (JMSException e) {
System.out.println("Exception in onMessage(): "
+ e.toString());
}
} else {
monitor.allDone();
}
}
}
/**
* Constructor: looks up a connection factory and topic and creates a
* connection and session.
*/
public DurableSubscriber() {
ConnectionFactory connectionFactory = null;
try {
connectionFactory =
SampleUtilities.getConnectionFactory();
connection =
connectionFactory.createConnection();
connection.setClientID("DurableSubscriberExample");
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, session);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (connection != null) {
try {
connection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
}
/**
* Stops connection, then creates durable subscriber, registers message
* listener (TextListener), and starts message delivery; listener
* displays the messages obtained.
*/
public void startSubscriber() {
try {
System.out.println("Starting subscriber");
connection.stop();
topicSubscriber = session.createDurableSubscriber(topic,
"MakeItLast");
topicListener = new TextListener();
topicSubscriber.setMessageListener(topicListener);
connection.start();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
/**
* Blocks until publisher issues a control message indicating
* end of publish stream, then closes subscriber.
*/
public void closeSubscriber() {
try {
topicListener.monitor.waitTillDone();
System.out.println("Closing subscriber");
topicSubscriber.close();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
/**
* Closes the connection.
*/
public void finish() {
if (connection != null) {
try {
session.unsubscribe("MakeItLast");
connection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
/**
* The MultiplePublisher class publishes several messages to a topic. It
* contains a constructor, a publishMessages method, and a finish method.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class MultiplePublisher {
Connection connection = null;
Session session = null;
Topic topic = null;
MessageProducer topicPublisher = null;
/**
* Constructor: looks up a connection factory and topic and creates a
* connection and session. Also creates the producer.
*/
public MultiplePublisher() {
ConnectionFactory connectionFactory = null;
try {
connectionFactory =
SampleUtilities.getConnectionFactory();
connection =
connectionFactory.createConnection();
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, session);
topicPublisher = session.createProducer(topic);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (connection != null) {
try {
connection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
}
/**
* Creates text message.
* Sends some messages, varying text slightly.
* Messages must be persistent.
*/
public void publishMessages() {
TextMessage message = null;
int i;
final int NUMMSGS = 3;
final String MSG_TEXT = new String("Here is a message");
try {
message = session.createTextMessage();
for (i = startindex; i < startindex + NUMMSGS; i++) {
message.setText(MSG_TEXT + " " + (i + 1));
System.out.println("PUBLISHER: Publishing message: "
+ message.getText());
topicPublisher.send(message);
}
// Send a non-text control message indicating end of messages.
topicPublisher.send(session.createMessage());
startindex = i;
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
/**
* Closes the connection.
*/
public void finish() {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
/**
* Instantiates the subscriber and publisher classes.
*
* Starts the subscriber; the publisher publishes some messages.
*
* Closes the subscriber; while it is closed, the publisher publishes
* some more messages.
*
* Restarts the subscriber and fetches the messages.
*
* Finally, closes the connections.
*/
public void run_program() {
DurableSubscriber durableSubscriber = new DurableSubscriber();
MultiplePublisher multiplePublisher = new MultiplePublisher();
durableSubscriber.startSubscriber();
multiplePublisher.publishMessages();
durableSubscriber.closeSubscriber();
multiplePublisher.publishMessages();
durableSubscriber.startSubscriber();
durableSubscriber.closeSubscriber();
multiplePublisher.finish();
durableSubscriber.finish();
}
/**
* Reads the topic name from the command line, then calls the
* run_program method.
*
* @param args the topic used by the example
*/
public static void main(String[] args) {
DurableSubscriberExample dse = new DurableSubscriberExample();
if (args.length != 1) {
System.out.println("Usage: java DurableSubscriberExample <topic_name>");
System.exit(1);
}
dse.topicName = new String(args[0]);
System.out.println("Topic name is " + dse.topicName);
dse.run_program();
SampleUtilities.exit(dse.exitResult);
}
}
|