001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.cocoon.components.jms;
018:
019: import javax.jms.DeliveryMode;
020: import javax.jms.JMSException;
021: import javax.jms.Message;
022: import javax.jms.Session;
023: import javax.jms.Topic;
024: import javax.jms.TopicConnection;
025: import javax.jms.TopicPublisher;
026: import javax.jms.TopicSession;
027:
028: import org.apache.avalon.framework.activity.Disposable;
029: import org.apache.avalon.framework.activity.Initializable;
030: import org.apache.avalon.framework.logger.AbstractLogEnabled;
031: import org.apache.avalon.framework.parameters.ParameterException;
032: import org.apache.avalon.framework.parameters.Parameterizable;
033: import org.apache.avalon.framework.parameters.Parameters;
034: import org.apache.avalon.framework.service.ServiceException;
035: import org.apache.avalon.framework.service.ServiceManager;
036: import org.apache.avalon.framework.service.Serviceable;
037:
038: /**
039: * Abstract JMS message publisher. Use this as a basis for components
040: * that want to publish JMS messages.
041: * When used in conjunction with the default {@link org.apache.cocoon.components.jms.JMSConnectionManager}
042: * implementation this class supports automatic reconnection when the connection gets severed.
043: *
044: * <p>Parameters:</p>
045: * <table border="1">
046: * <tbody>
047: * <tr>
048: * <th align="left">parameter</th>
049: * <th align="left">required</th>
050: * <th align="left">default</th>
051: * <th align="left">description</th>
052: * </tr>
053: * <tr>
054: * <td valign="top">connection</td>
055: * <td valign="top">yes</td>
056: * <td> </td>
057: * <td valign="top">
058: * Name of the connection registered with
059: * {@link org.apache.cocoon.components.jms.JMSConnectionManager}.
060: * This must be a topic connection.
061: * </td>
062: * </tr>
063: * <tr>
064: * <td valign="top">topic</td>
065: * <td valign="top">yes</td>
066: * <td> </td>
067: * <td valign="top">The name of the topic to publish messages to.</td>
068: * </tr>
069: * <tr>
070: * <td valign="top">priority</td>
071: * <td valign="top">no</td>
072: * <td>4</td>
073: * <td valign="top">the priority of the published messages</td>
074: * </tr>
075: * <tr>
076: * <td valign="top">time-to-live</td>
077: * <td valign="top">no</td>
078: * <td>10000</td>
079: * <td valign="top">the message's lifetime in milliseconds</td>
080: * </tr>
081: * <tr>
082: * <td valign="top">persistent-delivery</td>
083: * <td valign="top">no</td>
084: * <td>false</td>
085: * <td valign="top">whether to use persistent delivery mode when publishing messages</td>
086: * </tr>
087: * </tbody>
088: * </table>
089: *
090: * @version CVS $Id: AbstractMessagePublisher.java 30941 2004-07-29 19:56:58Z vgritsenko $
091: */
092: public abstract class AbstractMessagePublisher extends
093: AbstractLogEnabled implements Serviceable, Parameterizable,
094: Initializable, Disposable, JMSConnectionEventListener {
095:
096: // ---------------------------------------------------- Constants
097:
098: private static final String CONNECTION_PARAM = "connection";
099: private static final String TOPIC_PARAM = "topic";
100: private static final String PRIORITY_PARAM = "priority";
101: private static final String TIME_TO_LIVE_PARAM = "time-to-live";
102: private static final String PERSISTENT_DELIVERY_PARAM = "persistent-delivery";
103:
104: private static final int DEFAULT_PRIORITY = 4;
105: private static final int DEFAULT_TIME_TO_LIVE = 10000;
106:
107: // ---------------------------------------------------- Instance variables
108:
109: private ServiceManager m_manager;
110: private JMSConnectionManager m_connectionManager;
111:
112: protected TopicSession m_session;
113: protected TopicPublisher m_publisher;
114:
115: protected int m_mode;
116: protected int m_priority;
117: protected int m_timeToLive;
118: protected String m_topicName;
119: protected int m_acknowledgeMode;
120: protected String m_connectionName;
121:
122: // ---------------------------------------------------- Lifecycle
123:
124: public AbstractMessagePublisher() {
125: }
126:
127: public void service(ServiceManager manager) throws ServiceException {
128: m_manager = manager;
129: m_connectionManager = (JMSConnectionManager) m_manager
130: .lookup(JMSConnectionManager.ROLE);
131: }
132:
133: public void parameterize(Parameters parameters)
134: throws ParameterException {
135: m_connectionName = parameters.getParameter(CONNECTION_PARAM);
136: m_topicName = parameters.getParameter(TOPIC_PARAM);
137: m_priority = parameters.getParameterAsInteger(PRIORITY_PARAM,
138: DEFAULT_PRIORITY);
139: boolean persistent = parameters.getParameterAsBoolean(
140: PERSISTENT_DELIVERY_PARAM, false);
141: m_mode = (persistent ? DeliveryMode.PERSISTENT
142: : DeliveryMode.NON_PERSISTENT);
143: m_timeToLive = parameters.getParameterAsInteger(
144: TIME_TO_LIVE_PARAM, DEFAULT_TIME_TO_LIVE);
145: }
146:
147: public void initialize() throws Exception {
148: if (m_connectionManager instanceof JMSConnectionEventNotifier) {
149: ((JMSConnectionEventNotifier) m_connectionManager)
150: .addConnectionListener(m_connectionName, this );
151: }
152: createSessionAndPublisher();
153: }
154:
155: public void dispose() {
156: closePublisherAndSession();
157: if (m_manager != null) {
158: if (m_connectionManager != null) {
159: m_manager.release(m_connectionManager);
160: }
161: }
162: }
163:
164: // ---------------------------------------------------- JMSConnectionEventListener
165:
166: public void onConnection(String name) {
167: if (getLogger().isInfoEnabled()) {
168: getLogger().info(
169: "Creating publisher because of reconnection");
170: }
171: try {
172: createSessionAndPublisher();
173: } catch (JMSException e) {
174: if (getLogger().isWarnEnabled()) {
175: getLogger()
176: .warn(
177: "Reinitialization after reconnection failed",
178: e);
179: }
180: }
181: }
182:
183: public void onDisconnection(String name) {
184: if (getLogger().isInfoEnabled()) {
185: getLogger().info(
186: "Closing subscriber because of disconnection");
187: }
188: closePublisherAndSession();
189: }
190:
191: // ---------------------------------------------------- Implementation
192:
193: /**
194: * Concrete classes call this method to publish messages.
195: */
196: protected synchronized void publishMessage(Message message)
197: throws JMSException {
198: // TODO: discover disconnected state and queue messages until connected.
199: if (getLogger().isDebugEnabled()) {
200: getLogger().debug("Publishing message '" + message + "'");
201: }
202: m_publisher.publish(message, m_mode, m_priority, m_timeToLive);
203: }
204:
205: private void createSessionAndPublisher() throws JMSException {
206: // set the default acknowledge mode
207: // concrete implementations may override this
208: m_acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE;
209:
210: // create the message publisher
211: final TopicConnection connection = (TopicConnection) m_connectionManager
212: .getConnection(m_connectionName);
213: if (connection != null) {
214: m_session = connection.createTopicSession(false,
215: m_acknowledgeMode);
216: final Topic topic = m_session.createTopic(m_topicName);
217: m_publisher = m_session.createPublisher(topic);
218: } else {
219: if (getLogger().isWarnEnabled()) {
220: getLogger().warn(
221: "Could not obtain JMS connection '"
222: + m_connectionName + "'");
223: }
224: }
225: }
226:
227: private void closePublisherAndSession() {
228: if (m_publisher != null) {
229: try {
230: m_publisher.close();
231: } catch (JMSException e) {
232: getLogger().error("Error closing publisher.", e);
233: }
234: }
235: if (m_session != null) {
236: try {
237: m_session.close();
238: } catch (JMSException e) {
239: getLogger().warn("Error closing session.", e);
240: }
241: }
242: }
243:
244: }
|