001: /*--
002:
003: Copyright (C) 2002-2005 Adrian Price.
004: All rights reserved.
005:
006: Redistribution and use in source and binary forms, with or without
007: modification, are permitted provided that the following conditions
008: are met:
009:
010: 1. Redistributions of source code must retain the above copyright
011: notice, this list of conditions, and the following disclaimer.
012:
013: 2. Redistributions in binary form must reproduce the above copyright
014: notice, this list of conditions, and the disclaimer that follows
015: these conditions in the documentation and/or other materials
016: provided with the distribution.
017:
018: 3. The names "OBE" and "Open Business Engine" must not be used to
019: endorse or promote products derived from this software without prior
020: written permission. For written permission, please contact
021: adrianprice@sourceforge.net.
022:
023: 4. Products derived from this software may not be called "OBE" or
024: "Open Business Engine", nor may "OBE" or "Open Business Engine"
025: appear in their name, without prior written permission from
026: Adrian Price (adrianprice@users.sourceforge.net).
027:
028: THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
029: WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
030: OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
031: DISCLAIMED. IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT,
032: INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
033: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
034: SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
035: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
036: STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
037: IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
038: POSSIBILITY OF SUCH DAMAGE.
039:
040: For more information on OBE, please see
041: <http://obe.sourceforge.net/>.
042:
043: */
044:
045: package org.obe.server.j2ee.ejb;
046:
047: import org.apache.commons.logging.Log;
048: import org.apache.commons.logging.LogFactory;
049: import org.obe.client.api.rmi.JNDIHelper;
050: import org.obe.spi.service.ServerConfig;
051:
052: import javax.ejb.CreateException;
053: import javax.ejb.EJBException;
054: import javax.jms.*;
055: import javax.naming.InitialContext;
056: import javax.naming.NamingException;
057: import java.io.Serializable;
058: import java.util.Iterator;
059: import java.util.Map;
060:
061: /**
062: * Sends/posts messages to JMS destinations.
063: *
064: * @author Adrian Price
065: * @ejb:bean type="Stateless"
066: * name="JMSMessageProducer"
067: * display-name="OBE JMS Message Producer"
068: * local-jndi-name="org/obe/ejb/JMSMessageProducerLocal"
069: * transaction-type="Container"
070: * view-type="local"
071: * @ejb:home local-extends="javax.ejb.EJBLocalHome"
072: * package="org.obe.server.j2ee.ejb"
073: * @ejb:interface local-extends="javax.ejb.EJBLocalObject"
074: * package="org.obe.server.j2ee.ejb"
075: * @ejb:permission unchecked="true"
076: * @ejb:transaction type="Required"
077: * @weblogic:transaction-isolation ${transaction.isolation}
078: * @ejb:resource-ref res-name="jms/TopicConnectionFactory"
079: * res-type="javax.jms.TopicConnectionFactory"
080: * res-auth="Container"
081: * @jboss:resource-manager res-man-class="javax.jms.TopicConnectionFactory"
082: * res-man-name="jms/TopicConnectionFactory"
083: * res-man-jndi-name="java:/${xdoclet.jboss.TopicConnectionFactory}"
084: * @weblogic:resource-description res-ref-name="jms/TopicConnectionFactory"
085: * jndi-name="${xdoclet.TopicConnectionFactory}"
086: * @ejb:resource-ref res-name="jms/QueueConnectionFactory"
087: * res-type="javax.jms.QueueConnectionFactory"
088: * res-auth="Container"
089: * @jboss:resource-manager res-man-class="javax.jms.QueueConnectionFactory"
090: * res-man-name="jms/QueueConnectionFactory"
091: * res-man-jndi-name="java:/${xdoclet.jboss.QueueConnectionFactory}"
092: * @weblogic:resource-description res-ref-name="jms/QueueConnectionFactory"
093: * jndi-name="${xdoclet.QueueConnectionFactory}"
094: * @weblogic:pool max-beans-in-free-pool="50"
095: * initial-beans-in-free-pool="0"
096: */
097: public class JMSMessageProducerEJB extends AbstractSessionEJB {
098: private static final long serialVersionUID = -6303316955298752969L;
099: private static final Log _logger = LogFactory
100: .getLog(JMSMessageProducerEJB.class);
101: private static final String TOPIC_CONNECTION_FACTORY = "java:comp/env/jms/TopicConnectionFactory";
102: private static final String QUEUE_CONNECTION_FACTORY = "java:comp/env/jms/QueueConnectionFactory";
103: // These can be statics, as they are thread-safe objects.
104: private static TopicConnection _tcon;
105: private static QueueConnection _qcon;
106:
107: // These must be non-static, as they are not thread-safe.
108: private TopicSession _tsession;
109: private TopicPublisher _tpublisher;
110: private QueueSession _qsession;
111: private QueueSender _qsender;
112:
113: public JMSMessageProducerEJB() {
114: }
115:
116: /**
117: * @ejb:create-method
118: */
119: public void ejbCreate() throws CreateException {
120: if (_logger.isDebugEnabled() && ServerConfig.isVerbose())
121: _logger.debug("ejbCreate");
122: try {
123: connectJMS();
124: _tsession = _tcon.createTopicSession(false,
125: Session.AUTO_ACKNOWLEDGE);
126: _qsession = _qcon.createQueueSession(false,
127: Session.AUTO_ACKNOWLEDGE);
128: _tpublisher = _tsession.createPublisher(null);
129: _qsender = _qsession.createSender(null);
130: } catch (NamingException e) {
131: throw new EJBException(e);
132: } catch (JMSException e) {
133: throw new EJBException(e);
134: }
135: }
136:
137: public void ejbRemove() {
138: if (_logger.isDebugEnabled() && ServerConfig.isVerbose())
139: _logger.debug("ejbRemove");
140: try {
141: _tsession.close();
142: _qsession.close();
143: } catch (JMSException e) {
144: _logger.error(e);
145: }
146: }
147:
148: protected Log getLogger() {
149: return _logger;
150: }
151:
152: /**
153: * Send a JMS message with user-specified properties to a topic. This
154: * method supplies default values for delivery mode (PERSISTENT), priority
155: * (4) and TTL (0).
156: *
157: * @param topic The topic to which the message should be sent.
158: * @param payload The message payload.
159: * @param props Message properties.
160: * @throws JMSException if a problem occurs within JMS
161: * @ejb:interface-method
162: */
163: public void publish(Topic topic, Object payload, Map props)
164: throws JMSException {
165:
166: send(_tsession, _tpublisher, topic, true, payload, props,
167: DeliveryMode.PERSISTENT, 4, 0L);
168: }
169:
170: /**
171: * Send a JMS message with user-specified properties to a topic.
172: *
173: * @param topic The topic to which the message should be sent.
174: * @param payload The message payload.
175: * @param props Message properties.
176: * @param deliveryMode Delivery mode: <code>DeliveryMode.PERSISTENT</code>
177: * or <code>DeliveryMode.NON_PERSISTENT</code>.
178: * @param priority Message priority: 0 - 9.
179: * @param timeToLive Message lifetime in milliseconds.
180: * @throws JMSException if a problem occurs within JMS
181: * @ejb:interface-method
182: */
183: public void publish(Topic topic, Object payload, Map props,
184: int deliveryMode, int priority, long timeToLive)
185: throws JMSException {
186:
187: send(_tsession, _tpublisher, topic, true, payload, props,
188: deliveryMode, priority, timeToLive);
189: }
190:
191: /**
192: * Send a JMS message with user-specified properties to a topic. This
193: * method supplies default values for delivery mode (PERSISTENT), priority
194: * (4) and TTL (0).
195: *
196: * @param queue The queue to which the message should be sent.
197: * @param payload The message payload.
198: * @param props Message properties.
199: * @throws JMSException if a problem occurs within JMS
200: * @ejb:interface-method
201: */
202: public void send(Queue queue, Object payload, Map props)
203: throws JMSException {
204:
205: send(_qsession, _qsender, queue, false, payload, props,
206: DeliveryMode.PERSISTENT, 4, 0L);
207: }
208:
209: /**
210: * Send a JMS message with user-specified properties to a queue.
211: *
212: * @param queue The queue to which the message should be sent.
213: * @param payload The message payload.
214: * @param props Message properties.
215: * @param deliveryMode Delivery mode: <code>DeliveryMode.PERSISTENT</code>
216: * or <code>DeliveryMode.NON_PERSISTENT</code>.
217: * @param priority Message priority: 0 - 9.
218: * @param timeToLive Message lifetime in milliseconds.
219: * @throws JMSException if a problem occurs within JMS
220: * @ejb:interface-method
221: */
222: public void send(Queue queue, Object payload, Map props,
223: int deliveryMode, int priority, long timeToLive)
224: throws JMSException {
225:
226: send(_qsession, _qsender, queue, false, payload, props,
227: deliveryMode, priority, timeToLive);
228: }
229:
230: private void send(Session session, MessageProducer producer,
231: Destination destination, boolean isTopic, Object payload,
232: Map props, int deliveryMode, int priority, long timeToLive)
233: throws JMSException {
234:
235: if (_logger.isDebugEnabled()) {
236: _logger.debug("send(" + session + ',' + producer + ','
237: + destination + ',' + isTopic + ',' + payload + ','
238: + props + ',' + deliveryMode + ',' + priority + ','
239: + timeToLive + ')');
240: }
241:
242: // Create a message of the appropriate type.
243: Message msg;
244: if (payload instanceof String) {
245: msg = session.createTextMessage((String) payload);
246: } else if (payload instanceof byte[]) {
247: msg = session.createBytesMessage();
248: ((BytesMessage) msg).writeBytes((byte[]) payload);
249: } else if (payload instanceof Map) {
250: msg = session.createMapMessage();
251: MapMessage mapMsg = (MapMessage) msg;
252: Map map = (Map) payload;
253: for (Iterator i = map.entrySet().iterator(); i.hasNext();) {
254: Map.Entry entry = (Map.Entry) i.next();
255: mapMsg.setObject(entry.getKey().toString(), entry
256: .getValue());
257: }
258: } else {
259: msg = session.createObjectMessage((Serializable) payload);
260: }
261:
262: // Set header properties.
263: if (props != null)
264: setHeaderProperties(msg, props);
265:
266: // Send the message.
267: if (_logger.isDebugEnabled())
268: _logger.debug("Sending message");
269: if (isTopic) {
270: ((TopicPublisher) producer).publish((Topic) destination,
271: msg, deliveryMode, priority, timeToLive);
272: } else {
273: ((QueueSender) producer).send((Queue) destination, msg,
274: deliveryMode, priority, timeToLive);
275: }
276: }
277:
278: private void setHeaderProperties(Message msg, Map props) {
279: try {
280: msg.clearProperties();
281: for (Iterator i = props.keySet().iterator(); i.hasNext();) {
282: String key = (String) i.next();
283: Object value = props.get(key);
284: msg.setObjectProperty(key, value);
285: }
286: } catch (JMSException e) {
287: _logger.error(e);
288: }
289: }
290:
291: private static void connectJMS() throws NamingException,
292: JMSException {
293: // TODO: use ThreadLocal to implement multi-processor-safe locking.
294: if (_tcon == null) {
295: synchronized (JMSMessageProducerEJB.class) {
296: if (_tcon == null) {
297: InitialContext ic = JNDIHelper.getInitialContext();
298: TopicConnectionFactory tconFactory = (TopicConnectionFactory) ic
299: .lookup(TOPIC_CONNECTION_FACTORY);
300: _tcon = tconFactory.createTopicConnection();
301: _tcon.start();
302: QueueConnectionFactory qconFactory = (QueueConnectionFactory) ic
303: .lookup(QUEUE_CONNECTION_FACTORY);
304: _qcon = qconFactory.createQueueConnection();
305: _qcon.start();
306: ic.close();
307: }
308: }
309: }
310: }
311: }
|