001: /*
002: * Copyright 1999-2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package nl.hippo.cocoon.components.jms;
017:
018: import java.util.Date;
019: import java.util.HashMap;
020: import java.util.HashSet;
021: import java.util.Iterator;
022: import java.util.Map;
023: import java.util.Properties;
024: import java.util.Set;
025: import javax.jms.Connection;
026: import javax.jms.ConnectionFactory;
027: import javax.jms.ExceptionListener;
028: import javax.jms.JMSException;
029: import javax.jms.QueueConnection;
030: import javax.jms.QueueConnectionFactory;
031: import javax.jms.TopicConnection;
032: import javax.jms.TopicConnectionFactory;
033: import javax.naming.InitialContext;
034: import javax.naming.NamingException;
035: import org.apache.avalon.framework.CascadingException;
036: import org.apache.avalon.framework.activity.Disposable;
037: import org.apache.avalon.framework.activity.Initializable;
038: import org.apache.avalon.framework.activity.Startable;
039: import org.apache.avalon.framework.configuration.Configurable;
040: import org.apache.avalon.framework.configuration.Configuration;
041: import org.apache.avalon.framework.configuration.ConfigurationException;
042: import org.apache.avalon.framework.logger.AbstractLogEnabled;
043: import org.apache.avalon.framework.logger.Logger;
044: import org.apache.avalon.framework.parameters.ParameterException;
045: import org.apache.avalon.framework.parameters.Parameters;
046: import org.apache.avalon.framework.service.ServiceException;
047: import org.apache.avalon.framework.service.ServiceManager;
048: import org.apache.avalon.framework.service.Serviceable;
049: import org.apache.avalon.framework.thread.ThreadSafe;
050: import org.apache.cocoon.components.cron.CronJob;
051: import org.apache.cocoon.components.cron.JobScheduler;
052: import org.apache.cocoon.components.jms.JMSConnectionEventListener;
053: import org.apache.cocoon.components.jms.JMSConnectionEventNotifier;
054: import org.apache.cocoon.components.jms.JMSConnectionManager;
055:
056: /**
057: * {@link org.apache.cocoon.components.jms.JMSConnectionManager} implementation.
058: */
059: public class JMSConnectionManagerImpl extends AbstractLogEnabled
060: implements JMSConnectionManager, Serviceable, Configurable,
061: Initializable, Startable, Disposable, ThreadSafe,
062: JMSConnectionEventNotifier {
063:
064: // ---------------------------------------------------- Constants
065:
066: private static final int TOPIC_CONNECTION_TYPE = 1;
067: private static final int QUEUE_CONNECTION_TYPE = 2;
068: private static final int CONNECTION_TYPE = 3;
069:
070: private static final String CONNECTION_CONFIG = "connection";
071: private static final String TOPIC_CONNECTION_CONFIG = "topic-connection";
072: private static final String QUEUE_CONNECTION_CONFIG = "queue-connection";
073: private static final String NAME_ATTR = "name";
074:
075: private static final String CONNECTION_FACTORY_PARAM = "connection-factory";
076: private static final String USERNAME_PARAM = "username";
077: private static final String PASSWORD_PARAM = "password";
078: private static final String PROVIDER_URL = "java.naming.provider.url";
079: private static final String AUTO_RECONNECT_PARAM = "auto-reconnect";
080: private static final String AUTO_RECONNECT_DELAY_PARAM = "auto-reconnect-delay";
081:
082: private static final int DEFAULT_AUTO_RECONNECT_DELAY = 1000;
083:
084: private static final String JNDI_PROPERTY_PREFIX = "java.naming.";
085:
086: // ---------------------------------------------------- Instance variables
087:
088: private ServiceManager m_serviceManager;
089:
090: private Map m_configurations;
091: private Map m_connections;
092: private Map m_listeners;
093:
094: // ---------------------------------------------------- Lifecycle
095:
096: public JMSConnectionManagerImpl() {
097: }
098:
099: public void service(ServiceManager manager) {
100: m_serviceManager = manager;
101: }
102:
103: public void configure(Configuration configuration)
104: throws ConfigurationException {
105: m_configurations = new HashMap(
106: configuration.getChildren().length);
107: // <connection>s
108: Configuration[] configurations = configuration
109: .getChildren(CONNECTION_CONFIG);
110: configureConnections(configurations, CONNECTION_TYPE);
111: // <topic-connection>s
112: configurations = configuration
113: .getChildren(TOPIC_CONNECTION_CONFIG);
114: configureConnections(configurations, TOPIC_CONNECTION_TYPE);
115: // <queue-connection>s
116: configurations = configuration
117: .getChildren(QUEUE_CONNECTION_CONFIG);
118: configureConnections(configurations, QUEUE_CONNECTION_TYPE);
119: }
120:
121: private void configureConnections(Configuration[] connections,
122: int type) throws ConfigurationException {
123: for (int i = 0; i < connections.length; i++) {
124: final String name = connections[i].getAttribute(NAME_ATTR);
125: if (m_configurations.containsKey(name)) {
126: throw new ConfigurationException(
127: "Duplicate connection name '" + name + "'."
128: + " Connection names must be unique.");
129: }
130: final Parameters parameters = Parameters
131: .fromConfiguration(connections[i]);
132: ConnectionConfiguration cc = new ConnectionConfiguration(
133: name, parameters, type);
134: m_configurations.put(name, cc);
135: }
136: }
137:
138: public void initialize() throws Exception {
139: m_listeners = new HashMap();
140: m_connections = new HashMap(m_configurations.size());
141: final Iterator iter = m_configurations.values().iterator();
142:
143: while (iter.hasNext()) {
144: final ConnectionConfiguration cc = (ConnectionConfiguration) iter
145: .next();
146: try {
147: final Connection connection = createConnection(cc);
148:
149: m_connections.put(cc.getName(), connection);
150: } catch (NamingException e) {
151: // ignore, warnings for NamingExceptions are logged by createConnection method
152: }
153: }
154: m_configurations = null;
155: }
156:
157: public void start() throws Exception {
158: final Iterator iter = m_connections.entrySet().iterator();
159: while (iter.hasNext()) {
160: final Map.Entry entry = (Map.Entry) iter.next();
161: if (getLogger().isInfoEnabled()) {
162: getLogger().info(
163: "Starting JMS connection " + entry.getKey());
164: }
165: final Connection connection = (Connection) entry.getValue();
166: connection.start();
167: }
168: }
169:
170: public void stop() throws Exception {
171: final Iterator iter = m_connections.entrySet().iterator();
172: while (iter.hasNext()) {
173: final Map.Entry entry = (Map.Entry) iter.next();
174: stopConnection((String) entry.getKey(), (Connection) entry
175: .getValue());
176: }
177: }
178:
179: void stopConnection(String name, Connection connection) {
180: if (getLogger().isInfoEnabled()) {
181: getLogger().info("Stopping JMS connection " + name);
182: }
183: try {
184: connection.stop();
185: } catch (JMSException e) {
186: // ignore
187: }
188: }
189:
190: public void dispose() {
191: final Iterator iter = m_connections.entrySet().iterator();
192:
193: while (iter.hasNext()) {
194: final Map.Entry entry = (Map.Entry) iter.next();
195: if (getLogger().isDebugEnabled()) {
196: getLogger().debug(
197: "Closing JMS connection " + entry.getKey());
198: }
199: try {
200: final Connection connection = (Connection) entry
201: .getValue();
202: connection.close();
203: } catch (JMSException e) {
204: getLogger().error(
205: "Error closing JMS connection "
206: + entry.getKey(), e);
207: }
208: }
209: }
210:
211: // ---------------------------------------------------- ConnectionManager
212:
213: public synchronized Connection getConnection(String name) {
214: return (Connection) m_connections.get(name);
215: }
216:
217: public synchronized TopicConnection getTopicConnection(String name) {
218: return (TopicConnection) m_connections.get(name);
219: }
220:
221: public synchronized QueueConnection getQueueConnection(String name) {
222: return (QueueConnection) m_connections.get(name);
223: }
224:
225: // ---------------------------------------------------- JMSConnectionEventNotifier
226:
227: public synchronized void addConnectionListener(String name,
228: JMSConnectionEventListener listener) {
229: Set connectionListeners = (Set) m_listeners.get(name);
230: if (connectionListeners == null) {
231: connectionListeners = new HashSet();
232: m_listeners.put(name, connectionListeners);
233: }
234: connectionListeners.add(listener);
235: }
236:
237: public synchronized void removeConnectionListener(String name,
238: JMSConnectionEventListener listener) {
239: Set connectionListeners = (Set) m_listeners.get(name);
240: if (connectionListeners != null) {
241: connectionListeners.remove(listener);
242: }
243: }
244:
245: // ---------------------------------------------------- Implementation
246:
247: Connection createConnection(ConnectionConfiguration cc)
248: throws NamingException, JMSException {
249: try {
250: final InitialContext context = createInitialContext(cc
251: .getJNDIProperties());
252: Object factory = context.lookup(cc.getConnectionFactory());
253: final Connection connection = createConnection(factory, cc);
254: if (cc.isAutoReconnect()) {
255: connection
256: .setExceptionListener(new ReconnectionListener(
257: this , cc));
258: }
259: return connection;
260: } catch (NamingException e) {
261: if (getLogger().isWarnEnabled()) {
262: final Throwable rootCause = e.getRootCause();
263: if (rootCause != null) {
264: String message = e.getRootCause().getMessage();
265: if (rootCause instanceof ClassNotFoundException) {
266: String info = "WARN! *** JMS block is installed but jms client library not found. ***\n"
267: + "- For the jms block to work you must install and start a JMS server and "
268: + "place the client jar in WEB-INF/lib.";
269: if (message.indexOf("exolab") > 0) {
270: info += "\n- The default server, OpenJMS is configured in cocoon.xconf but is not bundled with Cocoon.";
271: }
272: System.err.println(info);
273: getLogger().warn(info, e);
274: } else {
275:
276: getLogger().error(
277: message + " :" + cc.m_providerUrl);
278: if (getLogger().isDebugEnabled()) {
279: getLogger()
280: .debug(
281: "Cannot get Initial Context. Is the JNDI server reachable?",
282: e);
283: }
284: }
285: } else {
286: getLogger().warn("Failed to initialize JMS.", e);
287: }
288: }
289: throw e;
290: }
291: }
292:
293: private Connection createConnection(Object factory,
294: ConnectionConfiguration cc) throws JMSException {
295: if (cc.getUserName() != null) {
296: switch (cc.getType()) {
297: case CONNECTION_TYPE: {
298: ConnectionFactory plainFactory = (ConnectionFactory) factory;
299: return plainFactory.createConnection(cc.getUserName(),
300: cc.getPassword());
301: }
302: case TOPIC_CONNECTION_TYPE: {
303: TopicConnectionFactory topicFactory = (TopicConnectionFactory) factory;
304: return topicFactory.createTopicConnection(cc
305: .getUserName(), cc.getPassword());
306: }
307: case QUEUE_CONNECTION_TYPE: {
308: QueueConnectionFactory queueFactory = (QueueConnectionFactory) factory;
309: return queueFactory.createQueueConnection(cc
310: .getUserName(), cc.getPassword());
311: }
312: }
313: }
314: switch (cc.getType()) {
315: case CONNECTION_TYPE: {
316: ConnectionFactory plainFactory = (ConnectionFactory) factory;
317: return plainFactory.createConnection();
318: }
319: case TOPIC_CONNECTION_TYPE: {
320: TopicConnectionFactory topicFactory = (TopicConnectionFactory) factory;
321: return topicFactory.createTopicConnection();
322: }
323: case QUEUE_CONNECTION_TYPE: {
324: QueueConnectionFactory queueFactory = (QueueConnectionFactory) factory;
325: return queueFactory.createQueueConnection();
326: }
327: }
328: return null;
329: }
330:
331: private InitialContext createInitialContext(Properties properties)
332: throws NamingException {
333: if (properties != null) {
334: return new InitialContext(properties);
335: }
336: return new InitialContext();
337: }
338:
339: synchronized void removeConnection(String name) {
340: notifyListenersOfDisconnection(name);
341: final Connection connection = (Connection) m_connections
342: .remove(name);
343: stopConnection(name, connection);
344: }
345:
346: synchronized void addConnection(String name, Connection connection) {
347: m_connections.put(name, connection);
348: notifyListenersOfConnection(name);
349: }
350:
351: void scheduleReconnectionJob(ConnectionConfiguration configuration) {
352: if (getLogger().isInfoEnabled()) {
353: getLogger().info(
354: "Scheduling JMS reconnection job for: "
355: + configuration.getName());
356: }
357: JobScheduler scheduler = null;
358: try {
359: scheduler = (JobScheduler) m_serviceManager
360: .lookup(JobScheduler.ROLE);
361: Date executionTime = new Date(System.currentTimeMillis()
362: + configuration.getAutoReconnectDelay());
363: ReconnectionJob job = new ReconnectionJob(this ,
364: configuration);
365: scheduler.fireJobAt(executionTime, "reconnect_"
366: + configuration.getName(), job);
367: } catch (ServiceException e) {
368: if (getLogger().isWarnEnabled()) {
369: getLogger().warn("Cannot obtain scheduler.", e);
370: }
371: } catch (CascadingException e) {
372: if (getLogger().isWarnEnabled()) {
373: getLogger().warn(
374: "Unable to schedule reconnection job.", e);
375: }
376: } finally {
377: if (scheduler != null) {
378: m_serviceManager.release(scheduler);
379: }
380: }
381: }
382:
383: private void notifyListenersOfConnection(String name) {
384: Set connectionListeners = (Set) m_listeners.get(name);
385: if (connectionListeners != null) {
386: for (Iterator listenersIterator = connectionListeners
387: .iterator(); listenersIterator.hasNext();) {
388: JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator
389: .next();
390: listener.onConnection(name);
391: }
392: }
393: }
394:
395: private void notifyListenersOfDisconnection(String name) {
396: Set connectionListeners = (Set) m_listeners.get(name);
397: if (connectionListeners != null) {
398: for (Iterator listenersIterator = connectionListeners
399: .iterator(); listenersIterator.hasNext();) {
400: JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator
401: .next();
402: listener.onDisconnection(name);
403: }
404: }
405: }
406:
407: static final class ConnectionConfiguration {
408:
409: // ------------------------------------------------ Instance variables
410:
411: private final String m_name;
412: private final int m_type;
413: private final String m_connectionFactory;
414: private final String m_username;
415: private final String m_providerUrl;
416: private final String m_password;
417: private final boolean m_autoReconnect;
418: private final int m_autoReconnectDelay;
419:
420: private Properties m_jndiProperties = new Properties();
421:
422: ConnectionConfiguration(String name, Parameters parameters,
423: int type) throws ConfigurationException {
424: m_name = name;
425: try {
426: m_connectionFactory = parameters
427: .getParameter(CONNECTION_FACTORY_PARAM);
428: m_username = parameters.getParameter(USERNAME_PARAM,
429: null);
430: m_password = parameters.getParameter(PASSWORD_PARAM,
431: null);
432: m_providerUrl = parameters.getParameter(PROVIDER_URL,
433: null);
434: m_autoReconnect = parameters.getParameterAsBoolean(
435: AUTO_RECONNECT_PARAM, false);
436: m_autoReconnectDelay = parameters
437: .getParameterAsInteger(
438: AUTO_RECONNECT_DELAY_PARAM,
439: DEFAULT_AUTO_RECONNECT_DELAY);
440:
441: // parse the jndi property parameters
442: String[] names = parameters.getNames();
443: for (int i = 0; i < names.length; i++) {
444: if (names[i].startsWith(JNDI_PROPERTY_PREFIX)) {
445: m_jndiProperties.put(names[i], parameters
446: .getParameter(names[i]));
447: }
448: }
449: } catch (ParameterException e) {
450: throw new ConfigurationException(e
451: .getLocalizedMessage());
452: }
453: m_type = type;
454: }
455:
456: String getName() {
457: return m_name;
458: }
459:
460: int getType() {
461: return m_type;
462: }
463:
464: Properties getJNDIProperties() {
465: return m_jndiProperties;
466: }
467:
468: String getConnectionFactory() {
469: return m_connectionFactory;
470: }
471:
472: String getUserName() {
473: return m_username;
474: }
475:
476: String getProviderUrl() {
477: return m_providerUrl;
478: }
479:
480: String getPassword() {
481: return m_password;
482: }
483:
484: boolean isAutoReconnect() {
485: return m_autoReconnect;
486: }
487:
488: int getAutoReconnectDelay() {
489: return m_autoReconnectDelay;
490: }
491:
492: public int hashCode() {
493: return m_name.hashCode();
494: }
495:
496: }
497:
498: static final class ReconnectionListener implements
499: ExceptionListener {
500:
501: private final JMSConnectionManagerImpl m_manager;
502: private final ConnectionConfiguration m_configuration;
503:
504: ReconnectionListener(JMSConnectionManagerImpl manager,
505: ConnectionConfiguration configuration) {
506: super ();
507: m_manager = manager;
508: m_configuration = configuration;
509: }
510:
511: public void onException(JMSException exception) {
512: final Logger logger = m_manager.getTheLogger();
513:
514: if (logger.isErrorEnabled()) {
515: logger.error("jms connection lost with: "
516: + m_configuration.m_providerUrl);
517: logger.error(exception.toString());
518: logger.error("scheduling reconnection");
519: }
520: m_manager.removeConnection(m_configuration.getName());
521: m_manager.scheduleReconnectionJob(m_configuration);
522: }
523:
524: }
525:
526: /**
527: + * Allows the innerclass Assistant to get access to the logger. JDK 1.3 gives a
528: + * NoSuchMethod error if getLogger() is called directly.
529: + */
530: private Logger getTheLogger() {
531: return getLogger();
532: }
533:
534: static final class ReconnectionJob implements CronJob {
535:
536: private final JMSConnectionManagerImpl m_manager;
537: private final ConnectionConfiguration m_configuration;
538:
539: ReconnectionJob(JMSConnectionManagerImpl manager,
540: ConnectionConfiguration configuration) {
541: super ();
542: m_manager = manager;
543: m_configuration = configuration;
544: }
545:
546: public void execute(String jobname) {
547:
548: final Logger logger = m_manager.getTheLogger();
549:
550: if (logger.isInfoEnabled()) {
551: logger.info("Reconnecting JMS connection: "
552: + m_configuration.getName());
553: }
554: try {
555: final Connection connection = m_manager
556: .createConnection(m_configuration);
557: m_manager.addConnection(m_configuration.getName(),
558: connection);
559: if (logger.isInfoEnabled()) {
560: logger
561: .info("Successfully reconnected JMS connection: "
562: + m_configuration.getName());
563: }
564: } catch (NamingException e) {
565: if (logger.isErrorEnabled()) {
566: logger.error("Failed to reconnect to :"
567: + m_configuration.m_providerUrl);
568: }
569: if (logger.isDebugEnabled()) {
570: logger.debug("Failed to reconnect.", e);
571: }
572:
573: m_manager.scheduleReconnectionJob(m_configuration);
574: } catch (JMSException e) {
575: if (logger.isErrorEnabled()) {
576: logger.error("Failed to reconnect to :"
577: + m_configuration.m_providerUrl);
578: }
579: if (logger.isDebugEnabled()) {
580: logger.debug("Failed to reconnect.", e);
581: }
582: m_manager.scheduleReconnectionJob(m_configuration);
583: }
584: }
585: }
586:
587: }
|