001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.cocoon.components.jms;
018:
019: import java.util.Date;
020: import java.util.HashMap;
021: import java.util.HashSet;
022: import java.util.Iterator;
023: import java.util.Map;
024: import java.util.Properties;
025: import java.util.Set;
026: import javax.jms.Connection;
027: import javax.jms.ConnectionFactory;
028: import javax.jms.ExceptionListener;
029: import javax.jms.JMSException;
030: import javax.jms.QueueConnection;
031: import javax.jms.QueueConnectionFactory;
032: import javax.jms.TopicConnection;
033: import javax.jms.TopicConnectionFactory;
034: import javax.naming.InitialContext;
035: import javax.naming.NamingException;
036: import org.apache.avalon.framework.CascadingException;
037: import org.apache.avalon.framework.activity.Disposable;
038: import org.apache.avalon.framework.activity.Initializable;
039: import org.apache.avalon.framework.activity.Startable;
040: import org.apache.avalon.framework.configuration.Configurable;
041: import org.apache.avalon.framework.configuration.Configuration;
042: import org.apache.avalon.framework.configuration.ConfigurationException;
043: import org.apache.avalon.framework.logger.AbstractLogEnabled;
044: import org.apache.avalon.framework.logger.Logger;
045: import org.apache.avalon.framework.parameters.ParameterException;
046: import org.apache.avalon.framework.parameters.Parameters;
047: import org.apache.avalon.framework.service.ServiceException;
048: import org.apache.avalon.framework.service.ServiceManager;
049: import org.apache.avalon.framework.service.Serviceable;
050: import org.apache.avalon.framework.thread.ThreadSafe;
051: import org.apache.cocoon.components.cron.CronJob;
052: import org.apache.cocoon.components.cron.JobScheduler;
053:
054: /**
055: * {@link org.apache.cocoon.components.jms.JMSConnectionManager} implementation.
056: */
057: public class JMSConnectionManagerImpl extends AbstractLogEnabled
058: implements JMSConnectionManager, Serviceable, Configurable,
059: Initializable, Startable, Disposable, ThreadSafe,
060: JMSConnectionEventNotifier {
061:
062: // ---------------------------------------------------- Constants
063:
064: private static final int TOPIC_CONNECTION_TYPE = 1;
065: private static final int QUEUE_CONNECTION_TYPE = 2;
066: private static final int CONNECTION_TYPE = 3;
067:
068: private static final String CONNECTION_CONFIG = "connection";
069: private static final String TOPIC_CONNECTION_CONFIG = "topic-connection";
070: private static final String QUEUE_CONNECTION_CONFIG = "queue-connection";
071: private static final String NAME_ATTR = "name";
072:
073: private static final String CONNECTION_FACTORY_PARAM = "connection-factory";
074: private static final String USERNAME_PARAM = "username";
075: private static final String PASSWORD_PARAM = "password";
076: private static final String AUTO_RECONNECT_PARAM = "auto-reconnect";
077: private static final String AUTO_RECONNECT_DELAY_PARAM = "auto-reconnect-delay";
078:
079: private static final int DEFAULT_AUTO_RECONNECT_DELAY = 1000;
080:
081: private static final String JNDI_PROPERTY_PREFIX = "java.naming.";
082:
083: // ---------------------------------------------------- Instance variables
084:
085: private ServiceManager m_serviceManager;
086:
087: private Map m_configurations;
088: private Map m_connections;
089: private Map m_listeners;
090:
091: // ---------------------------------------------------- Lifecycle
092:
093: public JMSConnectionManagerImpl() {
094: }
095:
096: public void service(ServiceManager manager) {
097: m_serviceManager = manager;
098: }
099:
100: public void configure(Configuration configuration)
101: throws ConfigurationException {
102: m_configurations = new HashMap(
103: configuration.getChildren().length);
104: // <connection>s
105: Configuration[] configurations = configuration
106: .getChildren(CONNECTION_CONFIG);
107: configureConnections(configurations, CONNECTION_TYPE);
108: // <topic-connection>s
109: configurations = configuration
110: .getChildren(TOPIC_CONNECTION_CONFIG);
111: configureConnections(configurations, TOPIC_CONNECTION_TYPE);
112: // <queue-connection>s
113: configurations = configuration
114: .getChildren(QUEUE_CONNECTION_CONFIG);
115: configureConnections(configurations, QUEUE_CONNECTION_TYPE);
116: }
117:
118: private void configureConnections(Configuration[] connections,
119: int type) throws ConfigurationException {
120: for (int i = 0; i < connections.length; i++) {
121: final String name = connections[i].getAttribute(NAME_ATTR);
122: if (m_configurations.containsKey(name)) {
123: throw new ConfigurationException(
124: "Duplicate connection name '" + name + "'."
125: + " Connection names must be unique.");
126: }
127: final Parameters parameters = Parameters
128: .fromConfiguration(connections[i]);
129: ConnectionConfiguration cc = new ConnectionConfiguration(
130: name, parameters, type);
131: m_configurations.put(name, cc);
132: }
133: }
134:
135: public void initialize() throws Exception {
136: m_listeners = new HashMap();
137: m_connections = new HashMap(m_configurations.size());
138: final Iterator iter = m_configurations.values().iterator();
139:
140: while (iter.hasNext()) {
141: final ConnectionConfiguration cc = (ConnectionConfiguration) iter
142: .next();
143: try {
144: final Connection connection = createConnection(cc);
145:
146: m_connections.put(cc.getName(), connection);
147: } catch (NamingException e) {
148: // ignore, warnings for NamingExceptions are logged by createConnection method
149: }
150: }
151: m_configurations = null;
152: }
153:
154: public void start() throws Exception {
155: final Iterator iter = m_connections.entrySet().iterator();
156: while (iter.hasNext()) {
157: final Map.Entry entry = (Map.Entry) iter.next();
158: if (getLogger().isDebugEnabled()) {
159: getLogger().debug(
160: "Starting JMS connection " + entry.getKey());
161: }
162: final Connection connection = (Connection) entry.getValue();
163: connection.start();
164: }
165: }
166:
167: public void stop() throws Exception {
168: final Iterator iter = m_connections.entrySet().iterator();
169: while (iter.hasNext()) {
170: final Map.Entry entry = (Map.Entry) iter.next();
171: stopConnection((String) entry.getKey(), (Connection) entry
172: .getValue());
173: }
174: }
175:
176: void stopConnection(String name, Connection connection) {
177: if (getLogger().isDebugEnabled()) {
178: getLogger().debug("Stopping JMS connection " + name);
179: }
180: try {
181: connection.stop();
182: } catch (JMSException e) {
183: // ignore
184: }
185: }
186:
187: public void dispose() {
188: final Iterator iter = m_connections.entrySet().iterator();
189: while (iter.hasNext()) {
190: final Map.Entry entry = (Map.Entry) iter.next();
191: if (getLogger().isDebugEnabled()) {
192: getLogger().debug(
193: "Closing JMS connection " + entry.getKey());
194: }
195: try {
196: final Connection connection = (Connection) entry
197: .getValue();
198: connection.close();
199: } catch (JMSException e) {
200: getLogger().error(
201: "Error closing JMS connection "
202: + entry.getKey(), e);
203: }
204: }
205: }
206:
207: // ---------------------------------------------------- ConnectionManager
208:
209: public synchronized Connection getConnection(String name) {
210: return (Connection) m_connections.get(name);
211: }
212:
213: public synchronized TopicConnection getTopicConnection(String name) {
214: return (TopicConnection) m_connections.get(name);
215: }
216:
217: public synchronized QueueConnection getQueueConnection(String name) {
218: return (QueueConnection) m_connections.get(name);
219: }
220:
221: // ---------------------------------------------------- JMSConnectionEventNotifier
222:
223: public synchronized void addConnectionListener(String name,
224: JMSConnectionEventListener listener) {
225: Set connectionListeners = (Set) m_listeners.get(name);
226: if (connectionListeners == null) {
227: connectionListeners = new HashSet();
228: m_listeners.put(name, connectionListeners);
229: }
230: connectionListeners.add(listener);
231: }
232:
233: public synchronized void removeConnectionListener(String name,
234: JMSConnectionEventListener listener) {
235: Set connectionListeners = (Set) m_listeners.get(name);
236: if (connectionListeners != null) {
237: connectionListeners.remove(listener);
238: }
239: }
240:
241: // ---------------------------------------------------- Implementation
242:
243: Connection createConnection(ConnectionConfiguration cc)
244: throws NamingException, JMSException {
245: try {
246: final InitialContext context = createInitialContext(cc
247: .getJNDIProperties());
248: final ConnectionFactory factory = (ConnectionFactory) context
249: .lookup(cc.getConnectionFactory());
250: final Connection connection = createConnection(factory, cc);
251: if (cc.isAutoReconnect()) {
252: connection
253: .setExceptionListener(new ReconnectionListener(
254: this , cc));
255: }
256: return connection;
257: } catch (NamingException e) {
258: if (getLogger().isWarnEnabled()) {
259: final Throwable rootCause = e.getRootCause();
260: if (rootCause != null) {
261: String message = e.getRootCause().getMessage();
262: if (rootCause instanceof ClassNotFoundException) {
263: String info = "WARN! *** JMS block is installed but jms client library not found. ***\n"
264: + "- For the jms block to work you must install and start a JMS server and "
265: + "place the client jar in WEB-INF/lib.";
266: if (message.indexOf("exolab") > 0) {
267: info += "\n- The default server, OpenJMS is configured in cocoon.xconf but is not bundled with Cocoon.";
268: }
269: System.err.println(info);
270: getLogger().warn(info, e);
271: } else {
272: System.out.println(message);
273: getLogger()
274: .warn(
275: "Cannot get Initial Context. Is the JNDI server reachable?",
276: e);
277: }
278: } else {
279: getLogger().warn("Failed to initialize JMS.", e);
280: }
281: }
282: throw e;
283: }
284: }
285:
286: private Connection createConnection(ConnectionFactory factory,
287: ConnectionConfiguration cc) throws JMSException {
288: if (cc.getUserName() != null) {
289: switch (cc.getType()) {
290: case CONNECTION_TYPE: {
291: return factory.createConnection(cc.getUserName(), cc
292: .getPassword());
293: }
294: case TOPIC_CONNECTION_TYPE: {
295: TopicConnectionFactory topicFactory = (TopicConnectionFactory) factory;
296: return topicFactory.createTopicConnection(cc
297: .getUserName(), cc.getPassword());
298: }
299: case QUEUE_CONNECTION_TYPE: {
300: QueueConnectionFactory queueFactory = (QueueConnectionFactory) factory;
301: return queueFactory.createQueueConnection(cc
302: .getUserName(), cc.getPassword());
303: }
304: }
305: }
306: switch (cc.getType()) {
307: case CONNECTION_TYPE: {
308: return factory.createConnection();
309: }
310: case TOPIC_CONNECTION_TYPE: {
311: TopicConnectionFactory topicFactory = (TopicConnectionFactory) factory;
312: return topicFactory.createTopicConnection();
313: }
314: case QUEUE_CONNECTION_TYPE: {
315: QueueConnectionFactory queueFactory = (QueueConnectionFactory) factory;
316: return queueFactory.createQueueConnection();
317: }
318: }
319: return null;
320: }
321:
322: private InitialContext createInitialContext(Properties properties)
323: throws NamingException {
324: if (properties != null) {
325: return new InitialContext(properties);
326: }
327: return new InitialContext();
328: }
329:
330: synchronized void removeConnection(String name) {
331: notifyListenersOfDisconnection(name);
332: final Connection connection = (Connection) m_connections
333: .remove(name);
334: stopConnection(name, connection);
335: }
336:
337: synchronized void addConnection(String name, Connection connection) {
338: m_connections.put(name, connection);
339: notifyListenersOfConnection(name);
340: }
341:
342: void scheduleReconnectionJob(ConnectionConfiguration configuration) {
343: if (getLogger().isInfoEnabled()) {
344: getLogger().info(
345: "Scheduling JMS reconnection job for: "
346: + configuration.getName());
347: }
348: JobScheduler scheduler = null;
349: try {
350: scheduler = (JobScheduler) m_serviceManager
351: .lookup(JobScheduler.ROLE);
352: Date executionTime = new Date(System.currentTimeMillis()
353: + configuration.getAutoReconnectDelay());
354: ReconnectionJob job = new ReconnectionJob(this ,
355: configuration);
356: scheduler.fireJobAt(executionTime, "reconnect_"
357: + configuration.getName(), job);
358: } catch (ServiceException e) {
359: if (getLogger().isWarnEnabled()) {
360: getLogger().warn("Cannot obtain scheduler.", e);
361: }
362: } catch (CascadingException e) {
363: if (getLogger().isWarnEnabled()) {
364: getLogger().warn(
365: "Unable to schedule reconnection job.", e);
366: }
367: } finally {
368: if (scheduler != null) {
369: m_serviceManager.release(scheduler);
370: }
371: }
372: }
373:
374: private void notifyListenersOfConnection(String name) {
375: Set connectionListeners = (Set) m_listeners.get(name);
376: if (connectionListeners != null) {
377: for (Iterator listenersIterator = connectionListeners
378: .iterator(); listenersIterator.hasNext();) {
379: JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator
380: .next();
381: listener.onConnection(name);
382: }
383: }
384: }
385:
386: private void notifyListenersOfDisconnection(String name) {
387: Set connectionListeners = (Set) m_listeners.get(name);
388: if (connectionListeners != null) {
389: for (Iterator listenersIterator = connectionListeners
390: .iterator(); listenersIterator.hasNext();) {
391: JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator
392: .next();
393: listener.onDisconnection(name);
394: }
395: }
396: }
397:
398: static final class ConnectionConfiguration {
399:
400: // ------------------------------------------------ Instance variables
401:
402: private final String m_name;
403: private final int m_type;
404: private final String m_connectionFactory;
405: private final String m_username;
406: private final String m_password;
407: private final boolean m_autoReconnect;
408: private final int m_autoReconnectDelay;
409:
410: private Properties m_jndiProperties = new Properties();
411:
412: ConnectionConfiguration(String name, Parameters parameters,
413: int type) throws ConfigurationException {
414: m_name = name;
415: try {
416: m_connectionFactory = parameters
417: .getParameter(CONNECTION_FACTORY_PARAM);
418: m_username = parameters.getParameter(USERNAME_PARAM,
419: null);
420: m_password = parameters.getParameter(PASSWORD_PARAM,
421: null);
422: m_autoReconnect = parameters.getParameterAsBoolean(
423: AUTO_RECONNECT_PARAM, false);
424: m_autoReconnectDelay = parameters
425: .getParameterAsInteger(
426: AUTO_RECONNECT_DELAY_PARAM,
427: DEFAULT_AUTO_RECONNECT_DELAY);
428:
429: // parse the jndi property parameters
430: String[] names = parameters.getNames();
431: for (int i = 0; i < names.length; i++) {
432: if (names[i].startsWith(JNDI_PROPERTY_PREFIX)) {
433: m_jndiProperties.put(names[i], parameters
434: .getParameter(names[i]));
435: }
436: }
437: } catch (ParameterException e) {
438: throw new ConfigurationException(e
439: .getLocalizedMessage());
440: }
441: m_type = type;
442: }
443:
444: String getName() {
445: return m_name;
446: }
447:
448: int getType() {
449: return m_type;
450: }
451:
452: Properties getJNDIProperties() {
453: return m_jndiProperties;
454: }
455:
456: String getConnectionFactory() {
457: return m_connectionFactory;
458: }
459:
460: String getUserName() {
461: return m_username;
462: }
463:
464: String getPassword() {
465: return m_password;
466: }
467:
468: boolean isAutoReconnect() {
469: return m_autoReconnect;
470: }
471:
472: int getAutoReconnectDelay() {
473: return m_autoReconnectDelay;
474: }
475:
476: public int hashCode() {
477: return m_name.hashCode();
478: }
479:
480: }
481:
482: static final class ReconnectionListener implements
483: ExceptionListener {
484:
485: private final JMSConnectionManagerImpl m_manager;
486: private final ConnectionConfiguration m_configuration;
487:
488: ReconnectionListener(JMSConnectionManagerImpl manager,
489: ConnectionConfiguration configuration) {
490: super ();
491: m_manager = manager;
492: m_configuration = configuration;
493: }
494:
495: public void onException(JMSException exception) {
496: m_manager.removeConnection(m_configuration.getName());
497: m_manager.scheduleReconnectionJob(m_configuration);
498: }
499:
500: }
501:
502: static final class ReconnectionJob implements CronJob {
503:
504: private final JMSConnectionManagerImpl m_manager;
505: private final ConnectionConfiguration m_configuration;
506:
507: ReconnectionJob(JMSConnectionManagerImpl manager,
508: ConnectionConfiguration configuration) {
509: super ();
510: m_manager = manager;
511: m_configuration = configuration;
512: }
513:
514: public void execute(String jobname) {
515: final Logger logger = m_manager.getLogger();
516: if (logger.isInfoEnabled()) {
517: logger.info("Reconnecting JMS connection: "
518: + m_configuration.getName());
519: }
520: try {
521: final Connection connection = m_manager
522: .createConnection(m_configuration);
523: m_manager.addConnection(m_configuration.getName(),
524: connection);
525: if (logger.isInfoEnabled()) {
526: logger
527: .info("Successfully reconnected JMS connection: "
528: + m_configuration.getName());
529: }
530: } catch (NamingException e) {
531: if (logger.isWarnEnabled()) {
532: logger.warn("Failed to reconnect.", e);
533: }
534: m_manager.scheduleReconnectionJob(m_configuration);
535: } catch (JMSException e) {
536: if (logger.isWarnEnabled()) {
537: logger.warn("Failed to reconnect.", e);
538: }
539: m_manager.scheduleReconnectionJob(m_configuration);
540: }
541: }
542: }
543:
544: }
|