001: /*--
002:
003: Copyright (C) 2002-2005 Adrian Price.
004: All rights reserved.
005:
006: Redistribution and use in source and binary forms, with or without
007: modification, are permitted provided that the following conditions
008: are met:
009:
010: 1. Redistributions of source code must retain the above copyright
011: notice, this list of conditions, and the following disclaimer.
012:
013: 2. Redistributions in binary form must reproduce the above copyright
014: notice, this list of conditions, and the disclaimer that follows
015: these conditions in the documentation and/or other materials
016: provided with the distribution.
017:
018: 3. The names "OBE" and "Open Business Engine" must not be used to
019: endorse or promote products derived from this software without prior
020: written permission. For written permission, please contact
021: adrianprice@sourceforge.net.
022:
023: 4. Products derived from this software may not be called "OBE" or
024: "Open Business Engine", nor may "OBE" or "Open Business Engine"
025: appear in their name, without prior written permission from
026: Adrian Price (adrianprice@users.sourceforge.net).
027:
028: THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
029: WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
030: OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
031: DISCLAIMED. IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT,
032: INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
033: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
034: SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
035: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
036: STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
037: IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
038: POSSIBILITY OF SUCH DAMAGE.
039:
040: For more information on OBE, please see
041: <http://obe.sourceforge.net/>.
042:
043: */
044:
045: package org.obe.server.j2ee.ejb;
046:
047: import org.apache.commons.logging.Log;
048: import org.apache.commons.logging.LogFactory;
049: import org.obe.OBERuntimeException;
050: import org.obe.server.j2ee.J2EEServerConfig;
051: import org.obe.spi.event.ApplicationEvent;
052: import org.obe.spi.service.ApplicationEventBroker;
053:
054: import javax.jms.*;
055: import java.util.Enumeration;
056: import java.util.HashMap;
057: import java.util.Map;
058:
059: /**
060: * Forwards application events to the application event broker.
061: *
062: * @author Adrian Price
063: * @ejb:bean name="ApplicationEventForwarder"
064: * display-name="OBE Application Event Forwarder"
065: * transaction-type="Container"
066: * destination-type="javax.jms.Queue"
067: * acknowledge-mode="Auto-acknowledge"
068: * subscription-durability="Durable"
069: * @ejb:permission unchecked="true"
070: * @ejb:security-identity run-as="system"
071: * @ejb:transaction type="Required"
072: * @weblogic:transaction-isolation ${transaction.isolation}
073: * @jboss:destination-jndi-name name="queue/${xdoclet.ApplicationEventQueue}"
074: * @weblogic:message-driven destination-jndi-name="${xdoclet.ApplicationEventQueue}"
075: * jms-polling-interval-seconds="10"
076: * @ejb:resource-ref res-name="jms/AsyncRequestQueue"
077: * res-type="javax.jms.Queue"
078: * res-auth="Container"
079: * @jboss:resource-manager res-man-class="javax.jms.Queue"
080: * res-man-name="jms/AsyncRequestQueue"
081: * res-man-jndi-name="queue/${xdoclet.AsyncRequestQueue}"
082: * @weblogic:resource-description res-ref-name="jms/AsyncRequestQueue"
083: * jndi-name="${xdoclet.AsyncRequestQueue}"
084: * @weblogic:pool max-beans-in-free-pool="50"
085: * initial-beans-in-free-pool="0"
086: */
087: public class ApplicationEventForwarderEJB extends AbstractMessageEJB
088: implements MessageListener {
089:
090: private static final long serialVersionUID = 6222299503164401572L;
091: private static final Log _logger = LogFactory
092: .getLog(ApplicationEventForwarderEJB.class);
093: private static final ApplicationEventBroker _broker = J2EEServerConfig.svcMgr
094: .getApplicationEventBroker();
095: public static final String JMS_CORRELATION_ID = "JMSCorrelationID";
096: public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode";
097: public static final String JMS_DESTINATION = "JMSDestination";
098: public static final String JMS_EXPIRATION = "JMSExpiration";
099: public static final String JMS_MESSAGE_ID = "JMSMessageID";
100: public static final String JMS_PRIORITY = "JMSPriority";
101: public static final String JMS_REDELIVERED = "JMSRedelivered";
102: public static final String JMS_REPLY_TO = "JMSReplyTo";
103: public static final String JMS_TIMESTAMP = "JMSTimestamp";
104: public static final String JMS_TYPE = "JMSType";
105:
106: private static Map getAttributes(Message msg) throws JMSException {
107: Enumeration e = msg.getPropertyNames();
108: Map attrs = new HashMap();
109: while (e.hasMoreElements()) {
110: String propName = (String) e.nextElement();
111: attrs.put(propName, msg.getObjectProperty(propName));
112: }
113: attrs.put(JMS_CORRELATION_ID, msg.getJMSCorrelationID());
114: attrs.put(JMS_DELIVERY_MODE, new Integer(msg
115: .getJMSDeliveryMode()));
116: attrs.put(JMS_DESTINATION, msg.getJMSDestination());
117: attrs.put(JMS_EXPIRATION, new Long(msg.getJMSExpiration()));
118: attrs.put(JMS_MESSAGE_ID, msg.getJMSMessageID());
119: attrs.put(JMS_PRIORITY, new Integer(msg.getJMSPriority()));
120: attrs.put(JMS_REDELIVERED,
121: msg.getJMSRedelivered() ? Boolean.TRUE : Boolean.FALSE);
122: attrs.put(JMS_REPLY_TO, msg.getJMSReplyTo());
123: attrs.put(JMS_TIMESTAMP, new Long(msg.getJMSTimestamp()));
124: attrs.put(JMS_TYPE, msg.getJMSType());
125: return attrs;
126: }
127:
128: protected Log getLogger() {
129: return _logger;
130: }
131:
132: public void onMessage(Message msg) {
133: if (_logger.isDebugEnabled())
134: _logger.debug("onMessage(" + msg + ')');
135:
136: // If the server isn't initialized yet, we must wait until it is.
137: if (!J2EEServerConfig.isInitialized())
138: J2EEServerConfig.waitUntilInitialized(_logger);
139:
140: try {
141: Object payload;
142: if (msg instanceof ObjectMessage) {
143: ObjectMessage objMsg = (ObjectMessage) msg;
144: payload = objMsg.getObject();
145: } else if (msg instanceof TextMessage) {
146: TextMessage txtMessage = (TextMessage) msg;
147: payload = txtMessage.getText();
148: } else {
149: _logger.warn("Unprocessable message discarded: " + msg);
150: return;
151: }
152: if (payload instanceof ApplicationEvent) {
153: _broker.publish((ApplicationEvent) payload);
154: } else {
155: Map attrs = getAttributes(msg);
156: _broker.publish(payload, attrs);
157: }
158: } catch (RuntimeException e) {
159: _logger.error(
160: "Exception occurred while processing message: ", e);
161: throw e;
162: } catch (Exception e) {
163: _logger.error(
164: "Exception occurred while processing message: ", e);
165: throw new OBERuntimeException(e);
166: }
167: }
168: }
|