001: /*
002: * Copyright 1999-2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package nl.hippo.cocoon.components.jms;
017:
018: import java.util.HashMap;
019: import java.util.HashSet;
020: import java.util.Iterator;
021: import java.util.Map;
022: import java.util.Properties;
023: import java.util.Set;
024:
025: import javax.jms.Connection;
026: import javax.jms.ConnectionFactory;
027: import javax.jms.ExceptionListener;
028: import javax.jms.JMSException;
029: import javax.jms.QueueConnection;
030: import javax.jms.QueueConnectionFactory;
031: import javax.jms.TopicConnection;
032: import javax.jms.TopicConnectionFactory;
033: import javax.naming.InitialContext;
034: import javax.naming.NamingException;
035:
036: import org.apache.avalon.framework.activity.Disposable;
037: import org.apache.avalon.framework.activity.Initializable;
038: import org.apache.avalon.framework.activity.Startable;
039: import org.apache.avalon.framework.configuration.Configurable;
040: import org.apache.avalon.framework.configuration.Configuration;
041: import org.apache.avalon.framework.configuration.ConfigurationException;
042: import org.apache.avalon.framework.logger.AbstractLogEnabled;
043: import org.apache.avalon.framework.logger.Logger;
044: import org.apache.avalon.framework.parameters.ParameterException;
045: import org.apache.avalon.framework.parameters.Parameters;
046: import org.apache.avalon.framework.service.ServiceManager;
047: import org.apache.avalon.framework.service.Serviceable;
048: import org.apache.avalon.framework.thread.ThreadSafe;
049: import org.apache.cocoon.components.jms.JMSConnectionEventListener;
050: import org.apache.cocoon.components.jms.JMSConnectionEventNotifier;
051: import org.apache.cocoon.components.jms.JMSConnectionManager;
052:
053: /**
054: * {@link org.apache.cocoon.components.jms.JMSConnectionManager} implementation.
055: */
056: public class CMS_JMSConnectionManagerImpl extends AbstractLogEnabled
057: implements JMSConnectionManager, Serviceable, Configurable,
058: Initializable, Startable, Disposable, ThreadSafe,
059: JMSConnectionEventNotifier {
060:
061: // ---------------------------------------------------- Constants
062:
063: private static final int TOPIC_CONNECTION_TYPE = 1;
064: private static final int QUEUE_CONNECTION_TYPE = 2;
065: private static final int CONNECTION_TYPE = 3;
066:
067: private static final String CONNECTION_CONFIG = "connection";
068: private static final String TOPIC_CONNECTION_CONFIG = "topic-connection";
069: private static final String QUEUE_CONNECTION_CONFIG = "queue-connection";
070: private static final String NAME_ATTR = "name";
071:
072: private static final String CONNECTION_FACTORY_PARAM = "connection-factory";
073: private static final String USERNAME_PARAM = "username";
074: private static final String PASSWORD_PARAM = "password";
075: private static final String PROVIDER_URL = "java.naming.provider.url";
076: private static final String AUTO_RECONNECT_PARAM = "auto-reconnect";
077: private static final String AUTO_RECONNECT_DELAY_PARAM = "auto-reconnect-delay";
078:
079: private static final int DEFAULT_AUTO_RECONNECT_DELAY = 1000;
080:
081: private static final String JNDI_PROPERTY_PREFIX = "java.naming.";
082:
083: // ---------------------------------------------------- Instance variables
084:
085: private ServiceManager m_serviceManager;
086:
087: private Map m_configurations;
088: private Map m_connections;
089: private Map m_listeners;
090:
091: // ---------------------------------------------------- Lifecycle
092:
093: public CMS_JMSConnectionManagerImpl() {
094: }
095:
096: public void service(ServiceManager manager) {
097: m_serviceManager = manager;
098: }
099:
100: public void configure(Configuration configuration)
101: throws ConfigurationException {
102: m_configurations = new HashMap(
103: configuration.getChildren().length);
104: // <connection>s
105: Configuration[] configurations = configuration
106: .getChildren(CONNECTION_CONFIG);
107: configureConnections(configurations, CONNECTION_TYPE);
108: // <topic-connection>s
109: configurations = configuration
110: .getChildren(TOPIC_CONNECTION_CONFIG);
111: configureConnections(configurations, TOPIC_CONNECTION_TYPE);
112: // <queue-connection>s
113: configurations = configuration
114: .getChildren(QUEUE_CONNECTION_CONFIG);
115: configureConnections(configurations, QUEUE_CONNECTION_TYPE);
116: }
117:
118: private void configureConnections(Configuration[] connections,
119: int type) throws ConfigurationException {
120: for (int i = 0; i < connections.length; i++) {
121: final String name = connections[i].getAttribute(NAME_ATTR);
122: if (m_configurations.containsKey(name)) {
123: throw new ConfigurationException(
124: "Duplicate connection name '" + name + "'."
125: + " Connection names must be unique.");
126: }
127: final Parameters parameters = Parameters
128: .fromConfiguration(connections[i]);
129: ConnectionConfiguration cc = new ConnectionConfiguration(
130: name, parameters, type);
131: m_configurations.put(name, cc);
132: }
133: }
134:
135: public void initialize() throws Exception {
136: m_listeners = new HashMap();
137: m_connections = new HashMap(m_configurations.size());
138: final Iterator iter = m_configurations.values().iterator();
139:
140: while (iter.hasNext()) {
141: final ConnectionConfiguration cc = (ConnectionConfiguration) iter
142: .next();
143: try {
144: final Connection connection = createConnection(cc);
145:
146: m_connections.put(cc.getName(), connection);
147: } catch (NamingException e) {
148: // ignore, warnings for NamingExceptions are logged by createConnection method
149: }
150: }
151: m_configurations = null;
152: }
153:
154: public void start() throws Exception {
155: final Iterator iter = m_connections.entrySet().iterator();
156: while (iter.hasNext()) {
157: final Map.Entry entry = (Map.Entry) iter.next();
158: if (getLogger().isInfoEnabled()) {
159: getLogger().info(
160: "Starting JMS connection " + entry.getKey());
161: }
162: final Connection connection = (Connection) entry.getValue();
163: connection.start();
164: }
165: }
166:
167: public void stop() throws Exception {
168: final Iterator iter = m_connections.entrySet().iterator();
169: while (iter.hasNext()) {
170: final Map.Entry entry = (Map.Entry) iter.next();
171: stopConnection((String) entry.getKey(), (Connection) entry
172: .getValue());
173: }
174: }
175:
176: void stopConnection(String name, Connection connection) {
177: if (getLogger().isInfoEnabled()) {
178: getLogger().info("Stopping JMS connection " + name);
179: }
180: try {
181: connection.stop();
182: } catch (JMSException e) {
183: // ignore
184: }
185: }
186:
187: public void dispose() {
188: final Iterator iter = m_connections.entrySet().iterator();
189:
190: while (iter.hasNext()) {
191: final Map.Entry entry = (Map.Entry) iter.next();
192: if (getLogger().isDebugEnabled()) {
193: getLogger().debug(
194: "Closing JMS connection " + entry.getKey());
195: }
196: try {
197: final Connection connection = (Connection) entry
198: .getValue();
199: connection.close();
200: } catch (JMSException e) {
201: getLogger().error(
202: "Error closing JMS connection "
203: + entry.getKey(), e);
204: }
205: }
206: }
207:
208: // ---------------------------------------------------- ConnectionManager
209:
210: public synchronized Connection getConnection(String name) {
211: return (Connection) m_connections.get(name);
212: }
213:
214: public synchronized TopicConnection getTopicConnection(String name) {
215: return (TopicConnection) m_connections.get(name);
216: }
217:
218: public synchronized QueueConnection getQueueConnection(String name) {
219: return (QueueConnection) m_connections.get(name);
220: }
221:
222: // ---------------------------------------------------- JMSConnectionEventNotifier
223:
224: public synchronized void addConnectionListener(String name,
225: JMSConnectionEventListener listener) {
226: Set connectionListeners = (Set) m_listeners.get(name);
227: if (connectionListeners == null) {
228: connectionListeners = new HashSet();
229: m_listeners.put(name, connectionListeners);
230: }
231: connectionListeners.add(listener);
232: }
233:
234: public synchronized void removeConnectionListener(String name,
235: JMSConnectionEventListener listener) {
236: Set connectionListeners = (Set) m_listeners.get(name);
237: if (connectionListeners != null) {
238: connectionListeners.remove(listener);
239: }
240: }
241:
242: // ---------------------------------------------------- Implementation
243:
244: Connection createConnection(ConnectionConfiguration cc)
245: throws NamingException, JMSException {
246: try {
247: final InitialContext context = createInitialContext(cc
248: .getJNDIProperties());
249: final ConnectionFactory factory = (ConnectionFactory) context
250: .lookup(cc.getConnectionFactory());
251: final Connection connection = createConnection(factory, cc);
252: if (cc.isAutoReconnect()) {
253: try {
254: connection
255: .setExceptionListener(new ReconnectionListener(
256: this , cc));
257: } catch (JMSException ex) {
258: getLogger()
259: .error(
260: "Possible use of JMS in webserver, cannot use auto reconnect.",
261: ex);
262: }
263: }
264: return connection;
265: } catch (NamingException e) {
266: if (getLogger().isWarnEnabled()) {
267: final Throwable rootCause = e.getRootCause();
268: if (rootCause != null) {
269: String message = e.getRootCause().getMessage();
270: if (rootCause instanceof ClassNotFoundException) {
271: String info = "WARN! *** JMS block is installed but jms client library not found. ***\n"
272: + "- For the jms block to work you must install and start a JMS server and "
273: + "place the client jar in WEB-INF/lib.";
274: if (message.indexOf("exolab") > 0) {
275: info += "\n- The default server, OpenJMS is configured in cocoon.xconf but is not bundled with Cocoon.";
276: }
277: System.err.println(info);
278: getLogger().warn(info, e);
279: } else {
280:
281: getLogger().error(
282: message + " :" + cc.m_providerUrl);
283: if (getLogger().isDebugEnabled()) {
284: getLogger()
285: .debug(
286: "Cannot get Initial Context. Is the JNDI server reachable?",
287: e);
288: }
289: }
290: } else {
291: getLogger().warn("Failed to initialize JMS.", e);
292: }
293: }
294: throw e;
295: }
296: }
297:
298: private Connection createConnection(ConnectionFactory factory,
299: ConnectionConfiguration cc) throws JMSException {
300: if (cc.getUserName() != null) {
301: switch (cc.getType()) {
302: case CONNECTION_TYPE: {
303: return factory.createConnection(cc.getUserName(), cc
304: .getPassword());
305: }
306: case TOPIC_CONNECTION_TYPE: {
307: TopicConnectionFactory topicFactory = (TopicConnectionFactory) factory;
308: return topicFactory.createTopicConnection(cc
309: .getUserName(), cc.getPassword());
310: }
311: case QUEUE_CONNECTION_TYPE: {
312: QueueConnectionFactory queueFactory = (QueueConnectionFactory) factory;
313: return queueFactory.createQueueConnection(cc
314: .getUserName(), cc.getPassword());
315: }
316: }
317: }
318: switch (cc.getType()) {
319: case CONNECTION_TYPE: {
320: return factory.createConnection();
321: }
322: case TOPIC_CONNECTION_TYPE: {
323: TopicConnectionFactory topicFactory = (TopicConnectionFactory) factory;
324: return topicFactory.createTopicConnection();
325: }
326: case QUEUE_CONNECTION_TYPE: {
327: QueueConnectionFactory queueFactory = (QueueConnectionFactory) factory;
328: return queueFactory.createQueueConnection();
329: }
330: }
331: return null;
332: }
333:
334: private InitialContext createInitialContext(Properties properties)
335: throws NamingException {
336: if (properties != null) {
337: return new InitialContext(properties);
338: }
339: return new InitialContext();
340: }
341:
342: synchronized void removeConnection(String name) {
343: notifyListenersOfDisconnection(name);
344: final Connection connection = (Connection) m_connections
345: .remove(name);
346: stopConnection(name, connection);
347: }
348:
349: synchronized void addConnection(String name, Connection connection) {
350: m_connections.put(name, connection);
351: notifyListenersOfConnection(name);
352: }
353:
354: void scheduleReconnectionJob(ConnectionConfiguration configuration) {
355: if (getLogger().isInfoEnabled()) {
356: getLogger().info(
357: "Scheduling JMS reconnection job for: "
358: + configuration.getName());
359: }
360:
361: boolean m_reconnected = false;
362:
363: Runnable reconnectionJob = new ReconnectionJob(this ,
364: configuration);
365: Thread reconnectionThread = new Thread(reconnectionJob);
366: reconnectionThread.start();
367:
368: }
369:
370: private void notifyListenersOfConnection(String name) {
371: Set connectionListeners = (Set) m_listeners.get(name);
372: if (connectionListeners != null) {
373: for (Iterator listenersIterator = connectionListeners
374: .iterator(); listenersIterator.hasNext();) {
375: JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator
376: .next();
377: listener.onConnection(name);
378: }
379: }
380: }
381:
382: private void notifyListenersOfDisconnection(String name) {
383: Set connectionListeners = (Set) m_listeners.get(name);
384: if (connectionListeners != null) {
385: for (Iterator listenersIterator = connectionListeners
386: .iterator(); listenersIterator.hasNext();) {
387: JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator
388: .next();
389: listener.onDisconnection(name);
390: }
391: }
392: }
393:
394: static final class ConnectionConfiguration {
395:
396: // ------------------------------------------------ Instance variables
397:
398: private final String m_name;
399: private final int m_type;
400: private final String m_connectionFactory;
401: private final String m_username;
402: private final String m_providerUrl;
403: private final String m_password;
404: private final boolean m_autoReconnect;
405: private final int m_autoReconnectDelay;
406:
407: private Properties m_jndiProperties = new Properties();
408:
409: ConnectionConfiguration(String name, Parameters parameters,
410: int type) throws ConfigurationException {
411: m_name = name;
412: try {
413: m_connectionFactory = parameters
414: .getParameter(CONNECTION_FACTORY_PARAM);
415: m_username = parameters.getParameter(USERNAME_PARAM,
416: null);
417: m_password = parameters.getParameter(PASSWORD_PARAM,
418: null);
419: m_providerUrl = parameters.getParameter(PROVIDER_URL,
420: null);
421: m_autoReconnect = parameters.getParameterAsBoolean(
422: AUTO_RECONNECT_PARAM, false);
423: m_autoReconnectDelay = parameters
424: .getParameterAsInteger(
425: AUTO_RECONNECT_DELAY_PARAM,
426: DEFAULT_AUTO_RECONNECT_DELAY);
427:
428: // parse the jndi property parameters
429: String[] names = parameters.getNames();
430: for (int i = 0; i < names.length; i++) {
431: if (names[i].startsWith(JNDI_PROPERTY_PREFIX)) {
432: m_jndiProperties.put(names[i], parameters
433: .getParameter(names[i]));
434: }
435: }
436: } catch (ParameterException e) {
437: throw new ConfigurationException(e
438: .getLocalizedMessage());
439: }
440: m_type = type;
441: }
442:
443: String getName() {
444: return m_name;
445: }
446:
447: int getType() {
448: return m_type;
449: }
450:
451: Properties getJNDIProperties() {
452: return m_jndiProperties;
453: }
454:
455: String getConnectionFactory() {
456: return m_connectionFactory;
457: }
458:
459: String getUserName() {
460: return m_username;
461: }
462:
463: String getProviderUrl() {
464: return m_providerUrl;
465: }
466:
467: String getPassword() {
468: return m_password;
469: }
470:
471: boolean isAutoReconnect() {
472: return m_autoReconnect;
473: }
474:
475: int getAutoReconnectDelay() {
476: return m_autoReconnectDelay;
477: }
478:
479: public int hashCode() {
480: return m_name.hashCode();
481: }
482:
483: }
484:
485: static final class ReconnectionListener implements
486: ExceptionListener {
487:
488: private final CMS_JMSConnectionManagerImpl m_manager;
489: private final ConnectionConfiguration m_configuration;
490:
491: ReconnectionListener(CMS_JMSConnectionManagerImpl manager,
492: ConnectionConfiguration configuration) {
493: super ();
494: m_manager = manager;
495: m_configuration = configuration;
496: }
497:
498: public void onException(JMSException exception) {
499: final Logger logger = m_manager.getTheLogger();
500:
501: if (logger.isErrorEnabled()) {
502: logger.error("jms connection lost with: "
503: + m_configuration.m_providerUrl);
504: logger.error(exception.toString());
505: logger.error("scheduling reconnection");
506: }
507: m_manager.removeConnection(m_configuration.getName());
508:
509: m_manager.scheduleReconnectionJob(m_configuration);
510:
511: }
512:
513: }
514:
515: /**
516: * Allows the innerclass Assistant to get access to the logger. JDK 1.3 gives a
517: * NoSuchMethod error if getLogger() is called directly.
518: */
519: private Logger getTheLogger() {
520: return getLogger();
521: }
522:
523: static final class ReconnectionJob implements Runnable {
524:
525: private final CMS_JMSConnectionManagerImpl m_manager;
526: private final ConnectionConfiguration m_configuration;
527:
528: ReconnectionJob(CMS_JMSConnectionManagerImpl manager,
529: ConnectionConfiguration configuration) {
530: super ();
531: m_manager = manager;
532: m_configuration = configuration;
533: }
534:
535: public void run() {
536:
537: boolean reconnected = false;
538:
539: while (!reconnected) {
540: try {
541: Thread.sleep(m_configuration.m_autoReconnectDelay);
542: reconnected = this .execute();
543: } catch (InterruptedException e) {
544: e.printStackTrace();
545: }
546: }
547:
548: }
549:
550: public boolean execute() {
551:
552: final Logger logger = m_manager.getTheLogger();
553:
554: if (logger.isInfoEnabled()) {
555: logger.info("Reconnecting JMS connection: "
556: + m_configuration.getName());
557: }
558: try {
559: final Connection connection = m_manager
560: .createConnection(m_configuration);
561: m_manager.addConnection(m_configuration.getName(),
562: connection);
563:
564: if (logger.isInfoEnabled()) {
565: logger
566: .info("Successfully reconnected JMS connection: "
567: + m_configuration.getName());
568: }
569: return true;
570: } catch (NamingException e) {
571: if (logger.isErrorEnabled()) {
572: logger.error("Failed to reconnect to :"
573: + m_configuration.m_providerUrl);
574: }
575: if (logger.isDebugEnabled()) {
576: logger.debug("Failed to reconnect.", e);
577: }
578: return false;
579: //m_manager.scheduleReconnectionJob(m_configuration);
580: } catch (JMSException e) {
581: if (logger.isErrorEnabled()) {
582: logger.error("Failed to reconnect to :"
583: + m_configuration.m_providerUrl);
584: }
585: if (logger.isDebugEnabled()) {
586: logger.debug("Failed to reconnect.", e);
587: }
588: return false;
589: //m_manager.scheduleReconnectionJob(m_configuration);
590: }
591: }
592:
593: }
594:
595: }
|