001: /*
002: * Copyright (c) 2002-2003 by OpenSymphony
003: * All rights reserved.
004: */
005: package com.opensymphony.oscache.plugins.clustersupport;
006:
007: import com.opensymphony.oscache.base.Cache;
008: import com.opensymphony.oscache.base.Config;
009: import com.opensymphony.oscache.base.FinalizationException;
010: import com.opensymphony.oscache.base.InitializationException;
011:
012: import org.apache.commons.logging.Log;
013: import org.apache.commons.logging.LogFactory;
014:
015: import javax.jms.*;
016:
017: import javax.naming.InitialContext;
018: import javax.naming.NamingException;
019:
020: /**
021: * A JMS based clustering implementation. This implementation is independent of the
022: * JMS provider and uses non-persistent messages on a publish subscribe protocol.
023: *
024: * @author <a href="mailto:motoras@linuxmail.org">Romulus Pasca</a>
025: */
026: public class JMSBroadcastingListener extends
027: AbstractBroadcastingListener {
028:
029: private final static Log log = LogFactory
030: .getLog(JMSBroadcastingListener.class);
031:
032: /**
033: *The JMS connection used
034: */
035: private Connection connection;
036:
037: /**
038: * Th object used to publish new messages
039: */
040: private MessageProducer messagePublisher;
041:
042: /**
043: * The current JMS session
044: */
045: private Session publisherSession;
046:
047: /**
048: * The name of this cluster. Used to identify the sender of a message.
049: */
050: private String clusterNode;
051:
052: /**
053: * <p>Called by the cache administrator class when a cache is instantiated.</p>
054: * <p>The JMS broadcasting implementation requires the following configuration
055: * properties to be specified in <code>oscache.properties</code>:
056: * <ul>
057: * <li><b>cache.cluster.jms.topic.factory</b> - The JMS connection factory to use</li>
058: * <li><b>cache.cluster.jms.topic.name</b> - The JMS topic name</li>
059: * <li><b>cache.cluster.jms.node.name</b> - The name of this node in the cluster. This
060: * should be unique for each node.</li>
061: * Please refer to the clustering documentation for further details on configuring
062: * the JMS clustered caching.</p>
063: *
064: * @param cache the cache instance that this listener is attached to.
065: *
066: * @throws com.opensymphony.oscache.base.InitializationException thrown when there was a
067: * problem initializing the listener. The cache administrator will log this error and
068: * disable the listener.
069: */
070: public void initialize(Cache cache, Config config)
071: throws InitializationException {
072: super .initialize(cache, config);
073:
074: // Get the name of this node
075: clusterNode = config.getProperty("cache.cluster.jms.node.name");
076:
077: String topic = config
078: .getProperty("cache.cluster.jms.topic.name");
079: String topicFactory = config
080: .getProperty("cache.cluster.jms.topic.factory");
081:
082: if (log.isInfoEnabled()) {
083: log.info("Starting JMS clustering (node name="
084: + clusterNode + ", topic=" + topic
085: + ", topic factory=" + topicFactory + ")");
086: }
087:
088: try {
089: // Make sure you have specified the necessary JNDI properties (usually in
090: // a jndi.properties resource file, or as system properties)
091: InitialContext jndi = getInitialContext();
092:
093: // Look up a JMS connection factory
094: ConnectionFactory connectionFactory = (ConnectionFactory) jndi
095: .lookup(topicFactory);
096:
097: // Create a JMS connection
098: connection = connectionFactory.createConnection();
099:
100: // Create session objects
101: publisherSession = connection.createSession(false,
102: Session.AUTO_ACKNOWLEDGE);
103:
104: Session subSession = connection.createSession(false,
105: Session.AUTO_ACKNOWLEDGE);
106:
107: // Look up the JMS topic
108: Topic chatTopic = (Topic) jndi.lookup(topic);
109:
110: // Create the publisher and subscriber
111: messagePublisher = publisherSession
112: .createProducer(chatTopic);
113:
114: MessageConsumer messageConsumer = subSession
115: .createConsumer(chatTopic);
116:
117: // Set the message listener
118: messageConsumer.setMessageListener(new MessageListener() {
119: public void onMessage(Message message) {
120: try {
121: //check the message type
122: ObjectMessage objectMessage = null;
123:
124: if (!(message instanceof ObjectMessage)) {
125: log
126: .error("Cannot handle message of type (class="
127: + message.getClass()
128: .getName()
129: + "). Notification ignored.");
130: return;
131: }
132:
133: objectMessage = (ObjectMessage) message;
134:
135: //check the message content
136: if (!(objectMessage.getObject() instanceof ClusterNotification)) {
137: log
138: .error("An unknown cluster notification message received (class="
139: + objectMessage.getObject()
140: .getClass()
141: .getName()
142: + "). Notification ignored.");
143: return;
144: }
145:
146: if (log.isDebugEnabled()) {
147: log.debug(objectMessage.getObject());
148: }
149:
150: // This prevents the notification sent by this node from being handled by itself
151: if (!objectMessage
152: .getStringProperty("nodeName").equals(
153: clusterNode)) {
154: //now handle the message
155: ClusterNotification notification = (ClusterNotification) objectMessage
156: .getObject();
157: handleClusterNotification(notification);
158: }
159: } catch (JMSException jmsEx) {
160: log.error("Cannot handle cluster Notification",
161: jmsEx);
162: }
163: }
164: });
165:
166: // Start the JMS connection; allows messages to be delivered
167: connection.start();
168: } catch (Exception e) {
169: throw new InitializationException(
170: "Initialization of the JMSBroadcastingListener failed: "
171: + e);
172: }
173: }
174:
175: /**
176: * Called by the cache administrator class when a cache is destroyed.
177: *
178: * @throws com.opensymphony.oscache.base.FinalizationException thrown when there was a problem finalizing the
179: * listener. The cache administrator will catch and log this error.
180: */
181: public void finialize() throws FinalizationException {
182: try {
183: if (log.isInfoEnabled()) {
184: log.info("Shutting down JMS clustering...");
185: }
186:
187: connection.close();
188:
189: if (log.isInfoEnabled()) {
190: log.info("JMS clustering shutdown complete.");
191: }
192: } catch (JMSException e) {
193: log
194: .warn(
195: "A problem was encountered when closing the JMS connection",
196: e);
197: }
198: }
199:
200: protected void sendNotification(ClusterNotification message) {
201: try {
202: ObjectMessage objectMessage = publisherSession
203: .createObjectMessage();
204: objectMessage.setObject(message);
205:
206: //sign the message, with the name of this node
207: objectMessage.setStringProperty("nodeName", clusterNode);
208: messagePublisher.send(objectMessage);
209: } catch (JMSException e) {
210: log.error("Cannot send notification " + message, e);
211: }
212: }
213:
214: /**
215: * @return creates a context for performing naming operations.
216: * @throws NamingException if a naming exception is encountered
217: */
218: protected InitialContext getInitialContext() throws NamingException {
219: return new InitialContext();
220: }
221:
222: }
|