001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.cache.invalidation.bridges;
023:
024: import java.io.Serializable;
025: import java.util.HashSet;
026: import java.util.HashMap;
027: import java.util.Iterator;
028:
029: import javax.jms.MessageListener;
030: import javax.jms.TopicConnection;
031: import javax.jms.TopicSession;
032: import javax.jms.Topic;
033: import javax.jms.TopicSubscriber;
034: import javax.jms.TopicPublisher;
035: import javax.jms.Message;
036: import javax.jms.ObjectMessage;
037: import javax.jms.TopicConnectionFactory;
038: import javax.jms.JMSException;
039: import javax.naming.InitialContext;
040: import javax.naming.NamingException;
041: import javax.naming.Context;
042:
043: import org.jboss.cache.invalidation.InvalidationManager;
044: import org.jboss.cache.invalidation.InvalidationBridgeListener;
045: import org.jboss.cache.invalidation.BatchInvalidation;
046: import org.jboss.system.ServiceMBeanSupport;
047:
048: /**
049: * JMS implementation of a cache invalidation bridge
050: *
051: * Based on previous code of Bill Burke based on interceptors
052: *
053: * @see org.jboss.cache.invalidation.InvalidationManagerMBean
054: *
055: * @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
056: * @author <a href="mailto:bill@jboss.org">Bill Burke</a>.
057: * @version $Revision: 57209 $
058: *
059: * <p><b>Revisions:</b>
060: *
061: * <p><b>28 septembre 2002 Sacha Labourey:</b>
062: * <ul>
063: * <li> First implementation </li>
064: * </ul>
065: */
066:
067: public class JMSCacheInvalidationBridge extends ServiceMBeanSupport
068: implements JMSCacheInvalidationBridgeMBean,
069: InvalidationBridgeListener, MessageListener {
070: // Constants -----------------------------------------------------
071:
072: public static final String JMS_CACHE_INVALIDATION_BRIDGE = "JMS_CACHE_INVALIDATION_BRIDGE";
073:
074: // Attributes ----------------------------------------------------
075:
076: // JMX Attributes
077: //
078: protected org.jboss.cache.invalidation.InvalidationManagerMBean invalMgr = null;
079: protected org.jboss.cache.invalidation.BridgeInvalidationSubscription invalidationSubscription = null;
080: protected String invalidationManagerName = InvalidationManager.DEFAULT_JMX_SERVICE_NAME;
081:
082: protected boolean publishingAuthorized = false;
083: protected String connectionFactoryName = "java:/ConnectionFactory";
084: protected String topicName = "topic/JMSCacheInvalidationBridge";
085: protected boolean transacted = true;
086: protected int acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; // AUTO_ACK by default
087: protected int propagationMode = JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION; // IN_OUT by default
088:
089: protected java.rmi.dgc.VMID serviceId = new java.rmi.dgc.VMID();
090:
091: protected TopicConnection conn = null;
092: protected TopicSession session = null;
093: protected Topic topic = null;
094: protected TopicSubscriber subscriber = null;
095: protected TopicPublisher pub = null;
096:
097: protected String providerUrl = null;
098:
099: // Static --------------------------------------------------------
100:
101: // Constructors --------------------------------------------------
102:
103: public JMSCacheInvalidationBridge() {
104: super ();
105: }
106:
107: // Public --------------------------------------------------------
108:
109: // *MBean implementation ----------------------------------------------
110:
111: public String getInvalidationManager() {
112: return this .invalidationManagerName;
113: }
114:
115: public void setInvalidationManager(String objectName) {
116: this .invalidationManagerName = objectName;
117: }
118:
119: public String getConnectionFactoryName() {
120: return this .connectionFactoryName;
121: }
122:
123: public void setConnectionFactoryName(String factoryName) {
124: this .connectionFactoryName = factoryName;
125: }
126:
127: public String getTopicName() {
128: return this .topicName;
129: }
130:
131: public void setTopicName(String topicName) {
132: this .topicName = topicName;
133: }
134:
135: public String getProviderUrl() {
136: return providerUrl;
137: }
138:
139: public void setProviderUrl(String providerUrl) {
140: this .providerUrl = providerUrl;
141: }
142:
143: public boolean isTransacted() {
144: return this .transacted;
145: }
146:
147: public void setTransacted(boolean isTransacted) {
148: this .transacted = isTransacted;
149: }
150:
151: public int getAcknowledgeMode() {
152: return this .acknowledgeMode;
153: }
154:
155: public void setAcknowledgeMode(int ackMode) {
156: if (ackMode > 3 || ackMode < 1)
157: throw new RuntimeException(
158: "Value AcknowledgeMode must be between 1 and 3");
159:
160: switch (ackMode) {
161: case 1:
162: this .acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE;
163: break;
164: case 2:
165: this .acknowledgeMode = TopicSession.CLIENT_ACKNOWLEDGE;
166: break;
167: case 3:
168: this .acknowledgeMode = TopicSession.DUPS_OK_ACKNOWLEDGE;
169: break;
170: }
171: }
172:
173: public int getPropagationMode() {
174: return this .propagationMode;
175: }
176:
177: public void setPropagationMode(int propMode) {
178: if (propMode > 3 || propMode < 1)
179: throw new RuntimeException(
180: "Value PropagationMode must be between 1 and 3");
181:
182: this .propagationMode = propMode;
183: }
184:
185: // MessageListener implementation ----------------------------------------------
186:
187: public void onMessage(Message msg) {
188: // just to make sure we are in the good mode
189: //
190: if (this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION
191: || this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) {
192: try {
193: ObjectMessage objmsg = (ObjectMessage) msg;
194: if (!objmsg.getJMSType().equals(
195: JMS_CACHE_INVALIDATION_BRIDGE))
196: return;
197: JMSCacheInvalidationMessage content = (JMSCacheInvalidationMessage) objmsg
198: .getObject();
199:
200: // Not very efficient as the whole message must be unserialized just to check
201: // if we were the emitter. Maybe wrapping this in a byte array would be more efficient
202: //
203: if (!content.emitter.equals(this .serviceId)) {
204: if (content.invalidateAllGroupName != null) {
205: invalidationSubscription
206: .invalidateAll(content.invalidateAllGroupName);
207: } else {
208: invalidationSubscription
209: .batchInvalidate(content
210: .getInvalidations());
211: }
212: }
213: } catch (Exception ex) {
214: log.warn(ex.getMessage());
215: }
216: }
217: }
218:
219: // InvalidationBridgeListener implementation ----------------------------------------------
220:
221: public void batchInvalidate(BatchInvalidation[] invalidations,
222: boolean asynchronous) {
223: if ((this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
224: && this .publishingAuthorized) {
225: JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage(
226: this .serviceId, invalidations);
227: this .sendJMSInvalidationEvent(msg);
228: }
229: }
230:
231: public void invalidate(String invalidationGroupName,
232: Serializable[] keys, boolean asynchronous) {
233: if ((this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
234: && this .publishingAuthorized) {
235: JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage(
236: this .serviceId, invalidationGroupName, keys);
237: this .sendJMSInvalidationEvent(msg);
238: }
239: }
240:
241: public void invalidate(String invalidationGroupName,
242: Serializable key, boolean asynchronous) {
243: if ((this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
244: && this .publishingAuthorized) {
245: JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage(
246: this .serviceId, invalidationGroupName,
247: new Serializable[] { key });
248: this .sendJMSInvalidationEvent(msg);
249: }
250: }
251:
252: public void invalidateAll(String groupName, boolean asynchronous) {
253: if ((this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
254: && this .publishingAuthorized) {
255: JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage(
256: this .serviceId, groupName);
257: this .sendJMSInvalidationEvent(msg);
258: }
259: }
260:
261: public void newGroupCreated(String groupInvalidationName) {
262: // we don't manage groups dynamically, so we don't really care...
263: //
264: }
265:
266: public void groupIsDropped(String groupInvalidationName) {
267: // we don't manage groups dynamically, so we don't really care...
268: //
269: }
270:
271: // ServiceMBeanSupport overrides ---------------------------------------------------
272:
273: protected void startService() throws Exception {
274: log.info("Starting JMS cache invalidation bridge");
275:
276: // Deal with the InvalidationManager first..
277: //
278: this .invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean) org.jboss.system.Registry
279: .lookup(this .invalidationManagerName);
280:
281: this .invalidationSubscription = invalMgr
282: .registerBridgeListener(this );
283:
284: // deal with JMS next
285: //
286: InitialContext iniCtx = getInitialContext();
287:
288: Object tmp = iniCtx.lookup(this .connectionFactoryName);
289: TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
290: conn = tcf.createTopicConnection();
291:
292: topic = (Topic) iniCtx.lookup(this .topicName);
293: session = conn.createTopicSession(this .transacted,
294: this .acknowledgeMode);
295:
296: conn.start();
297:
298: // Are we publisher, subscriber, or both?
299: //
300: if (this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION
301: || this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) {
302: this .subscriber = session.createSubscriber(topic);
303: this .subscriber.setMessageListener(this );
304: }
305:
306: if (this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION
307: || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) {
308: this .pub = session.createPublisher(topic);
309: this .publishingAuthorized = true;
310: }
311: }
312:
313: protected void stopService() {
314: log.info("Stoping JMS cache invalidation bridge");
315: try {
316: if (this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION
317: || this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) {
318: subscriber.close();
319: }
320:
321: if (this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION
322: || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) {
323: this .publishingAuthorized = false;
324: pub.close();
325: }
326:
327: conn.stop();
328: session.close();
329: conn.close();
330:
331: } catch (Exception ex) {
332: log
333: .warn(
334: "Failed to stop JMS resources associated with the JMS bridge: ",
335: ex);
336: }
337: }
338:
339: // Package protected ---------------------------------------------
340:
341: // Protected -----------------------------------------------------
342:
343: protected synchronized TopicSession getSession() {
344: return this .session;
345: }
346:
347: protected synchronized TopicPublisher getPublisher() {
348: return this .pub;
349: }
350:
351: protected void sendJMSInvalidationEvent(
352: JMSCacheInvalidationMessage invalidationMsg) {
353: try {
354: if (log.isTraceEnabled())
355: log.trace("sending JMS message for cache invalidation"
356: + invalidationMsg);
357:
358: try {
359: ObjectMessage msg = getSession().createObjectMessage();
360: msg.setJMSType(JMS_CACHE_INVALIDATION_BRIDGE);
361: msg.setObject(invalidationMsg);
362: getPublisher().publish(msg);
363: } catch (JMSException ex) {
364: log.debug("failed to publish seppuku event: ", ex);
365: }
366: } catch (Exception ex) {
367: log.warn("failed to do cluster seppuku event: ", ex);
368: }
369: }
370:
371: protected InitialContext getInitialContext() throws NamingException {
372: if (providerUrl == null) {
373: return new InitialContext();
374: } else {
375: if (log.isDebugEnabled())
376: log.debug("Using Context.PROVIDER_URL: " + providerUrl);
377:
378: java.util.Properties props = new java.util.Properties(
379: System.getProperties());
380: props.put(Context.PROVIDER_URL, providerUrl);
381: return new InitialContext(props);
382: }
383: }
384:
385: // Private -------------------------------------------------------
386:
387: // Inner classes -------------------------------------------------
388:
389: }
|