001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.cocoon.components.jms;
018:
019: import javax.jms.JMSException;
020: import javax.jms.MessageListener;
021: import javax.jms.Session;
022: import javax.jms.Topic;
023: import javax.jms.TopicConnection;
024: import javax.jms.TopicSession;
025: import javax.jms.TopicSubscriber;
026: import org.apache.avalon.framework.activity.Disposable;
027: import org.apache.avalon.framework.activity.Initializable;
028: import org.apache.avalon.framework.logger.AbstractLogEnabled;
029: import org.apache.avalon.framework.parameters.ParameterException;
030: import org.apache.avalon.framework.parameters.Parameterizable;
031: import org.apache.avalon.framework.parameters.Parameters;
032: import org.apache.avalon.framework.service.ServiceException;
033: import org.apache.avalon.framework.service.ServiceManager;
034: import org.apache.avalon.framework.service.Serviceable;
035:
036: /**
037: * Abstract {@link javax.jms.MessageListener} implementation.
038: * Use this as a basis for concrete MessageListener implementations.
039: * When used in conjunction with the default {@link org.apache.cocoon.components.jms.JMSConnectionManager}
040: * implementation this class supports automatic reconnection when the connection gets severed.
041: *
042: * <p>Parameters:</p>
043: * <table border="1">
044: * <tbody>
045: * <tr>
046: * <th align="left">parameter</th>
047: * <th align="left">required/default</th>
048: * <th align="left">description</th>
049: * </tr>
050: * <tr>
051: * <td valign="top">connection</td>
052: * <td valign="top">required</td>
053: * <td valign="top">
054: * Name of the connection registered with
055: * {@link org.apache.cocoon.components.jms.JMSConnectionManager}.
056: * This must be a topic connection.
057: * </td>
058: * </tr>
059: * <tr>
060: * <td>topic</td>
061: * <td>required</td>
062: * <td>The name of the topic to subscribe to.</td>
063: * </tr>
064: * <tr>
065: * <td>subscription-id</td>
066: * <td>(<code>null</code>)</td>
067: * <td>An optional durable subscription id.</td>
068: * </tr>
069: * <tr>
070: * <td>message-selector</td>
071: * <td>(<code>null</code>)</td>
072: * <td>An optional message selector.</td>
073: * </tr>
074: * </tbody>
075: * </table>
076: *
077: * @version CVS $Id: AbstractMessageListener.java 30941 2004-07-29 19:56:58Z vgritsenko $
078: */
079: public abstract class AbstractMessageListener extends
080: AbstractLogEnabled implements MessageListener, Serviceable,
081: Parameterizable, Initializable, Disposable,
082: JMSConnectionEventListener {
083:
084: // ---------------------------------------------------- Constants
085:
086: private static final String CONNECTION_PARAM = "connection";
087: private static final String TOPIC_PARAM = "topic";
088: private static final String SUBSCRIPTION_ID_PARAM = "subscription-id";
089: private static final String MESSAGE_SELECTOR_PARAM = "message-selector";
090:
091: // ---------------------------------------------------- Instance variables
092:
093: protected ServiceManager m_manager;
094:
095: /* configuration */
096: protected String m_connectionName;
097: protected String m_topicName;
098: protected String m_subscriptionId;
099: protected String m_selector;
100: protected int m_acknowledgeMode;
101:
102: /* connection manager component */
103: private JMSConnectionManager m_connectionManager;
104:
105: /* our session */
106: private TopicSession m_session;
107:
108: /* our subscriber */
109: private TopicSubscriber m_subscriber;
110:
111: // ---------------------------------------------------- Lifecycle
112:
113: public AbstractMessageListener() {
114: }
115:
116: public void service(ServiceManager manager) throws ServiceException {
117: m_manager = manager;
118: m_connectionManager = (JMSConnectionManager) m_manager
119: .lookup(JMSConnectionManager.ROLE);
120: }
121:
122: public void parameterize(Parameters parameters)
123: throws ParameterException {
124:
125: m_connectionName = parameters.getParameter(CONNECTION_PARAM);
126: m_topicName = parameters.getParameter(TOPIC_PARAM);
127:
128: m_subscriptionId = parameters.getParameter(
129: SUBSCRIPTION_ID_PARAM, null);
130: m_selector = parameters.getParameter(MESSAGE_SELECTOR_PARAM,
131: null);
132:
133: }
134:
135: /**
136: * Registers this MessageListener as a TopicSubscriber to the configured Topic.
137: * @throws Exception
138: */
139: public void initialize() throws Exception {
140: if (m_connectionManager instanceof JMSConnectionEventNotifier) {
141: ((JMSConnectionEventNotifier) m_connectionManager)
142: .addConnectionListener(m_connectionName, this );
143: }
144: createSessionAndSubscriber();
145: }
146:
147: public void dispose() {
148: closeSubscriberAndSession();
149: m_manager.release(m_connectionManager);
150: }
151:
152: public void onConnection(String name) {
153: if (getLogger().isInfoEnabled()) {
154: getLogger().info(
155: "Creating subscriber because of reconnection");
156: }
157: try {
158: createSessionAndSubscriber();
159: } catch (JMSException e) {
160: if (getLogger().isWarnEnabled()) {
161: getLogger()
162: .warn(
163: "Reinitialization after reconnection failed",
164: e);
165: }
166: }
167: }
168:
169: public void onDisconnection(String name) {
170: if (getLogger().isInfoEnabled()) {
171: getLogger().info(
172: "Closing subscriber because of disconnection");
173: }
174: closeSubscriberAndSession();
175: }
176:
177: private void createSessionAndSubscriber() throws JMSException {
178: // set the default acknowledge mode to dups
179: // concrete implementations may want to override this
180: m_acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE;
181:
182: // register this MessageListener with a TopicSubscriber
183: final TopicConnection connection = (TopicConnection) m_connectionManager
184: .getConnection(m_connectionName);
185: if (connection != null) {
186: m_session = connection.createTopicSession(false,
187: m_acknowledgeMode);
188: final Topic topic = m_session.createTopic(m_topicName);
189: if (m_subscriptionId != null) {
190: m_subscriber = m_session.createDurableSubscriber(topic,
191: m_subscriptionId, m_selector, false);
192: } else {
193: m_subscriber = m_session.createSubscriber(topic,
194: m_selector, false);
195: }
196: m_subscriber.setMessageListener(this );
197: // recover in case of reconnection
198: m_session.recover();
199: } else {
200: if (getLogger().isWarnEnabled()) {
201: getLogger().warn(
202: "Could not obtain JMS connection '"
203: + m_connectionName + "'");
204: }
205: }
206: }
207:
208: private void closeSubscriberAndSession() {
209: if (m_subscriber != null) {
210: try {
211: m_subscriber.close();
212: } catch (JMSException e) {
213: getLogger().error("Error closing subscriber", e);
214: } finally {
215: m_subscriber = null;
216: }
217: }
218: if (m_session != null) {
219: try {
220: m_session.close();
221: } catch (JMSException e) {
222: getLogger().error("Error closing session", e);
223: } finally {
224: m_session = null;
225: }
226: }
227: }
228: }
|