001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.cocoon.components.slide.impl;
018:
019: import java.util.ArrayList;
020: import java.util.Collections;
021: import java.util.Hashtable;
022: import java.util.Iterator;
023: import java.util.List;
024:
025: import javax.jms.DeliveryMode;
026: import javax.jms.JMSException;
027: import javax.jms.Session;
028: import javax.jms.Topic;
029: import javax.jms.TopicConnection;
030: import javax.jms.TopicConnectionFactory;
031: import javax.jms.TopicPublisher;
032: import javax.jms.TopicSession;
033: import javax.naming.Context;
034: import javax.naming.InitialContext;
035: import javax.naming.NamingException;
036:
037: import org.apache.slide.common.NamespaceAccessToken;
038: import org.apache.slide.common.ServiceAccessException;
039: import org.apache.slide.common.SlideToken;
040: import org.apache.slide.content.AbstractContentInterceptor;
041: import org.apache.slide.content.NodeRevisionContent;
042: import org.apache.slide.content.NodeRevisionDescriptor;
043: import org.apache.slide.content.NodeRevisionDescriptors;
044: import org.apache.slide.lock.ObjectLockedException;
045: import org.apache.slide.security.AccessDeniedException;
046: import org.apache.slide.structure.LinkedObjectNotFoundException;
047: import org.apache.slide.structure.ObjectNotFoundException;
048: import org.apache.slide.util.logger.Logger;
049:
050: /**
051: * A ContentInterceptor for Slide that publishes
052: * invalidation events to a JMS topic.
053: */
054: public class JMSContentInterceptor extends AbstractContentInterceptor {
055:
056: // ---------------------------------------------------- constants
057:
058: private static final String LOG_CHANNEL = "JMSContentInterceptor";
059:
060: private static final String PARAM_TOPIC_FACTORY = "topic-factory";
061: private static final String PARAM_TOPIC = "topic";
062: private static final String PARAM_PERSISTENT = "persistent-delivery";
063: private static final String PARAM_PRIORITY = "priority";
064: private static final String PARAM_TIME_TO_LIVE = "time-to-live";
065: private static final String PARAM_INITIAL_CONTEXT_FACTORY = Context.INITIAL_CONTEXT_FACTORY;
066: private static final String PARAM_PROVIDER_URL = Context.PROVIDER_URL;
067:
068: private static final String DEFAULT_TOPIC_FACTORY = "JmsTopicConnectionFactory";
069: private static final String DEFAULT_TOPIC = "topic1";
070: private static final String DEFAULT_PERSISTENT = "false";
071: private static final String DEFAULT_PRIORITY = "4";
072: private static final String DEFAULT_TIME_TO_LIVE = "1000";
073: private static final String DEFAULT_INITIAL_CONTEXT_FACTORY = "org.exolab.jms.jndi.InitialContextFactory";
074: private static final String DEFAULT_PROVIDER_URL = "rmi://localhost:1099/";
075:
076: // ---------------------------------------------------- member variables
077:
078: // JMS objects
079: private TopicConnection m_connection;
080: private TopicSession m_session;
081: private Topic m_topic;
082: private TopicPublisher m_publisher;
083:
084: // configuration options
085: private String m_topicFactoryName;
086: private String m_topicName;
087: private int m_deliveryMode;
088: private int m_priority;
089: private long m_timeToLive;
090: private Hashtable m_jndiProps;
091:
092: // queue of messages to be published
093: private List m_queue = Collections
094: .synchronizedList(new ArrayList());
095:
096: // the started state of the interceptor
097: private boolean m_started = false;
098:
099: // ---------------------------------------------------- lifecycle
100:
101: public JMSContentInterceptor() {
102: }
103:
104: /**
105: * Configure the interceptor.
106: * <p>
107: * The following parameters are recognized:
108: * <ul>
109: * <li>
110: * <code>java.naming.factory.initial</code> [<code>org.exolab.jms.jndi.InitialContextFactory</code>]
111: * - initial jndi context factory.
112: * </li>
113: * <li>
114: * <code>java.naming.provider.url</code> [<code>rmi://localhost:1099/</code>] - jndi provider url.
115: * </li>
116: * <li>
117: * <code>topic-factory<code> [<code>JmsTopicConnectionFactory</code>] - the JNDI lookup name
118: * of the JMS TopicConnectionFactory.
119: * </li>
120: * <li>
121: * <code>topic</code> [<code>topic1</code>] - the name of the topic to publish messages to.
122: * </li>
123: * <li>
124: * <code>persistent-delivery</code> [<code>false</code>] - message delivery mode.
125: * </li>
126: * <li>
127: * <code>priority</code> [<code>4</code>] - message priority.
128: * </li>
129: * <li>
130: * <code>time-to-live</code> [<code>1000</code>] - message lifetime in ms.
131: * </li>
132: * </ul>
133: * </p>
134: */
135: public void setParameters(Hashtable params) {
136: super .setParameters(params);
137:
138: // parse the JMS related parameters
139: m_topicFactoryName = getParameter(PARAM_TOPIC_FACTORY,
140: DEFAULT_TOPIC_FACTORY);
141: m_topicName = getParameter(PARAM_TOPIC, DEFAULT_TOPIC);
142: boolean persistent = Boolean.valueOf(
143: getParameter(PARAM_PERSISTENT, DEFAULT_PERSISTENT))
144: .booleanValue();
145:
146: m_deliveryMode = persistent ? DeliveryMode.PERSISTENT
147: : DeliveryMode.NON_PERSISTENT;
148: m_priority = Integer.valueOf(
149: getParameter(PARAM_PRIORITY, DEFAULT_PRIORITY))
150: .intValue();
151: m_timeToLive = Long.valueOf(
152: getParameter(PARAM_TIME_TO_LIVE, DEFAULT_TIME_TO_LIVE))
153: .longValue();
154:
155: // parse the JNDI related parameters
156: m_jndiProps = new Hashtable();
157: m_jndiProps.put(Context.INITIAL_CONTEXT_FACTORY, getParameter(
158: PARAM_INITIAL_CONTEXT_FACTORY,
159: DEFAULT_INITIAL_CONTEXT_FACTORY));
160: m_jndiProps.put(Context.PROVIDER_URL, getParameter(
161: PARAM_PROVIDER_URL, DEFAULT_PROVIDER_URL));
162: }
163:
164: /**
165: * Sets up the JMS connection.
166: */
167: public void setNamespace(NamespaceAccessToken nat) {
168: super .setNamespace(nat);
169:
170: // setup the JMS connection and session
171: try {
172: Context context = new InitialContext(m_jndiProps);
173: TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) context
174: .lookup(m_topicFactoryName);
175: m_connection = topicConnectionFactory
176: .createTopicConnection();
177: m_connection.start();
178: m_session = m_connection.createTopicSession(false,
179: Session.DUPS_OK_ACKNOWLEDGE);
180: m_topic = m_session.createTopic(m_topicName);
181: m_publisher = m_session.createPublisher(m_topic);
182:
183: /* a background thread does the actual publishing
184: * of messages. This is because JMS Session
185: * and its derivatives are single threaded
186: * and so if we were to publish messages synchronously
187: * we'd have to create a new Session
188: * each time we need to publish a message.
189: */
190: Thread t = new Thread(new Runnable() {
191: public void run() {
192: m_started = true;
193: while (m_started) {
194: try {
195: Thread.sleep(1000);
196: } catch (InterruptedException e) {
197: // continue
198: }
199: if (m_queue.size() == 0)
200: continue;
201: List list = m_queue;
202: m_queue = Collections
203: .synchronizedList(new ArrayList());
204: Iterator iter = list.iterator();
205: while (iter.hasNext()) {
206: String msg = (String) iter.next();
207: if (getLogger().isEnabled(Logger.INFO)) {
208: getLogger().log(
209: "Sending message: " + msg,
210: Logger.INFO);
211: }
212: try {
213: m_publisher.publish(m_session
214: .createTextMessage(msg),
215: m_deliveryMode, m_priority,
216: m_timeToLive);
217: } catch (JMSException e) {
218: getLogger().log(
219: "Failure sending JMS message.",
220: e, LOG_CHANNEL, Logger.ERROR);
221: }
222: }
223: }
224: }
225: });
226: t.setPriority(Thread.NORM_PRIORITY);
227: t.start();
228: } catch (NamingException e) {
229: getLogger().log("Failure while connecting to JMS server.",
230: e, LOG_CHANNEL, Logger.ERROR);
231: } catch (JMSException e) {
232: getLogger().log("Failure while connecting to JMS server.",
233: e, LOG_CHANNEL, Logger.ERROR);
234: }
235:
236: }
237:
238: private String getParameter(String name, String defaultValue) {
239: String value = super .getParameter(name);
240: if (value == null) {
241: value = defaultValue;
242: }
243: return value;
244: }
245:
246: // ---------------------------------------------------- interception methods
247:
248: public void postRemoveContent(SlideToken slideToken,
249: NodeRevisionDescriptors revisions,
250: NodeRevisionDescriptor revision)
251: throws AccessDeniedException, ObjectNotFoundException,
252: LinkedObjectNotFoundException, ObjectLockedException,
253: ServiceAccessException {
254:
255: if (!m_started) {
256: return;
257: }
258: if (revisions == null) {
259: return;
260: }
261: queueMessage(revisions.getUri(), "remove");
262:
263: }
264:
265: public void postStoreContent(SlideToken slideToken,
266: NodeRevisionDescriptors revisions,
267: NodeRevisionDescriptor revision, NodeRevisionContent content)
268: throws AccessDeniedException, ObjectNotFoundException,
269: LinkedObjectNotFoundException, ObjectLockedException,
270: ServiceAccessException {
271:
272: if (!m_started) {
273: return;
274: }
275: if (revisions == null) {
276: return;
277: }
278: queueMessage(revisions.getUri(), "store");
279: }
280:
281: private void queueMessage(String uri, String type) {
282: String msg = "slide-interceptor:" + type + "|"
283: + getNamespace().getName() + uri;
284: m_queue.add(msg);
285: }
286:
287: private Logger getLogger() {
288: return getNamespace().getLogger();
289: }
290: }
|