001: /*
002: * Copyright (c) 2002-2003 by OpenSymphony
003: * All rights reserved.
004: */
005: package com.opensymphony.oscache.plugins.clustersupport;
006:
007: import com.opensymphony.oscache.base.Cache;
008: import com.opensymphony.oscache.base.Config;
009: import com.opensymphony.oscache.base.FinalizationException;
010: import com.opensymphony.oscache.base.InitializationException;
011:
012: import org.apache.commons.logging.Log;
013: import org.apache.commons.logging.LogFactory;
014:
015: import javax.jms.*;
016:
017: import javax.naming.InitialContext;
018:
019: /**
020: * A JMS 1.0.x based clustering implementation. This implementation is independent of
021: * the JMS provider and uses non-persistent messages on a publish subscribe protocol.
022: *
023: * @author <a href="mailto:chris@swebtec.com">Chris Miller</a>
024: */
025: public class JMS10BroadcastingListener extends
026: AbstractBroadcastingListener {
027: private final static Log log = LogFactory
028: .getLog(JMS10BroadcastingListener.class);
029:
030: /**
031: * The name of this cluster. Used to identify the sender of a message.
032: */
033: private String clusterNode;
034:
035: /**
036: *The JMS connection used
037: */
038: private TopicConnection connection;
039:
040: /**
041: * Th object used to publish new messages
042: */
043: private TopicPublisher publisher;
044:
045: /**
046: * The current JMS session
047: */
048: private TopicSession publisherSession;
049:
050: /**
051: * <p>Called by the cache administrator class when a cache is instantiated.</p>
052: * <p>The JMS broadcasting implementation requires the following configuration
053: * properties to be specified in <code>oscache.properties</code>:
054: * <ul>
055: * <li><b>cache.cluster.jms.topic.factory</b> - The JMS connection factory to use</li>
056: * <li><b>cache.cluster.jms.topic.name</b> - The JMS topic name</li>
057: * <li><b>cache.cluster.jms.node.name</b> - The name of this node in the cluster. This should
058: * be unique for each node.</li>
059: * Please refer to the clustering documentation for further details on configuring
060: * the JMS clustered caching.</p>
061: *
062: * @param cache the cache instance that this listener is attached to.
063: *
064: * @throws InitializationException thrown when there was a
065: * problem initializing the listener. The cache administrator will log this error and
066: * disable the listener.
067: */
068: public void initialize(Cache cache, Config config)
069: throws InitializationException {
070: super .initialize(cache, config);
071:
072: // Get the name of this node
073: clusterNode = config.getProperty("cache.cluster.jms.node.name");
074:
075: String topic = config
076: .getProperty("cache.cluster.jms.topic.name");
077: String topicFactory = config
078: .getProperty("cache.cluster.jms.topic.factory");
079:
080: if (log.isInfoEnabled()) {
081: log.info("Starting JMS clustering (node name="
082: + clusterNode + ", topic=" + topic
083: + ", topic factory=" + topicFactory + ")");
084: }
085:
086: try {
087: // Make sure you have specified the necessary JNDI properties (usually in
088: // a jndi.properties resource file, or as system properties)
089: InitialContext jndi = new InitialContext();
090:
091: // Look up a JMS connection factory
092: TopicConnectionFactory connectionFactory = (TopicConnectionFactory) jndi
093: .lookup(topicFactory);
094:
095: // Create a JMS connection
096: connection = connectionFactory.createTopicConnection();
097:
098: // Create session objects
099: publisherSession = connection.createTopicSession(false,
100: Session.AUTO_ACKNOWLEDGE);
101:
102: TopicSession subSession = connection.createTopicSession(
103: false, Session.AUTO_ACKNOWLEDGE);
104:
105: // Look up the JMS topic
106: Topic chatTopic = (Topic) jndi.lookup(topic);
107:
108: // Create the publisher and subscriber
109: publisher = publisherSession.createPublisher(chatTopic);
110:
111: TopicSubscriber subscriber = subSession
112: .createSubscriber(chatTopic);
113:
114: // Set the message listener
115: subscriber.setMessageListener(new MessageListener() {
116: public void onMessage(Message message) {
117: try {
118: //check the message type
119: ObjectMessage objectMessage = null;
120:
121: if (!(message instanceof ObjectMessage)) {
122: log
123: .error("Cannot handle message of type (class="
124: + message.getClass()
125: .getName()
126: + "). Notification ignored.");
127: return;
128: }
129:
130: objectMessage = (ObjectMessage) message;
131:
132: //check the message content
133: if (!(objectMessage.getObject() instanceof ClusterNotification)) {
134: log
135: .error("An unknown cluster notification message received (class="
136: + objectMessage.getObject()
137: .getClass()
138: .getName()
139: + "). Notification ignored.");
140: return;
141: }
142:
143: if (log.isDebugEnabled()) {
144: log.debug(objectMessage.getObject());
145: }
146:
147: // This prevents the notification sent by this node from being handled by itself
148: if (!objectMessage
149: .getStringProperty("nodeName").equals(
150: clusterNode)) {
151: //now handle the message
152: ClusterNotification notification = (ClusterNotification) objectMessage
153: .getObject();
154: handleClusterNotification(notification);
155: }
156: } catch (JMSException jmsEx) {
157: log.error("Cannot handle cluster Notification",
158: jmsEx);
159: }
160: }
161: });
162:
163: // Start the JMS connection; allows messages to be delivered
164: connection.start();
165: } catch (Exception e) {
166: throw new InitializationException(
167: "Initialization of the JMS10BroadcastingListener failed: "
168: + e);
169: }
170: }
171:
172: /**
173: * Called by the cache administrator class when a cache is destroyed.
174: *
175: * @throws FinalizationException thrown when there was a problem finalizing the
176: * listener. The cache administrator will catch and log this error.
177: */
178: public void finialize() throws FinalizationException {
179: try {
180: if (log.isInfoEnabled()) {
181: log.info("Shutting down JMS clustering...");
182: }
183:
184: connection.close();
185:
186: if (log.isInfoEnabled()) {
187: log.info("JMS clustering shutdown complete.");
188: }
189: } catch (JMSException e) {
190: log
191: .warn(
192: "A problem was encountered when closing the JMS connection",
193: e);
194: }
195: }
196:
197: protected void sendNotification(ClusterNotification message) {
198: try {
199: ObjectMessage objectMessage = publisherSession
200: .createObjectMessage();
201: objectMessage.setObject(message);
202:
203: //sign the message, with the name of this node
204: objectMessage.setStringProperty("nodeName", clusterNode);
205: publisher.publish(objectMessage);
206: } catch (JMSException e) {
207: log.error("Cannot send notification " + message, e);
208: }
209: }
210: }
|