001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.timer.ejb;
023:
024: import java.io.Serializable;
025: import javax.ejb.EJBException;
026: import javax.ejb.MessageDrivenBean;
027: import javax.ejb.MessageDrivenContext;
028: import javax.ejb.TimedObject;
029: import javax.ejb.Timer;
030: import javax.ejb.TimerService;
031: import javax.jms.DeliveryMode;
032: import javax.jms.JMSException;
033: import javax.jms.Message;
034: import javax.jms.MessageListener;
035: import javax.jms.Queue;
036: import javax.jms.QueueConnection;
037: import javax.jms.QueueConnectionFactory;
038: import javax.jms.QueueSender;
039: import javax.jms.QueueSession;
040: import javax.jms.Session;
041: import javax.jms.TextMessage;
042: import javax.naming.InitialContext;
043:
044: import org.jboss.logging.Logger;
045:
046: /** An MDB that schedules an ejb timer for every onMessage receipt. The timer
047: * is a single event timer that expires after 10 seconds.
048: *
049: * @ejb.bean name="TimerMDB"
050: * description="TimerMDB unit test bean"
051: * destination-type="javax.jms.Queue"
052: * acknowledge-mode="Auto-acknowledge"
053: * @ejb.resource-ref res-ref-name="jms/QCF" res-type="javax.jms.QueueConnectionFactory" res-auth="Container"
054: * @jboss.destination-jndi-name name="queue/A"
055: * @jboss.resource-ref res-ref-name="jms/QCF" jndi-name="ConnectionFactory"
056: *
057: * @author Scott.Stark@jboss.org
058: * @version $Revision: 57211 $
059: */
060: public class TimerMessageBean implements MessageDrivenBean,
061: MessageListener, TimedObject {
062: private static Logger log = Logger
063: .getLogger(TimerMessageBean.class);
064: private MessageDrivenContext messageContext = null;
065: private QueueConnection qc = null;
066: private InitialContext ctx = null;
067: private long timerTimeout = 10000;
068:
069: static class ReplyInfo implements Serializable {
070: static final long serialVersionUID = 4607021612356305822L;
071: private int msgID;
072: private Queue replyTo;
073:
074: ReplyInfo(int msgID, Queue replyTo) {
075: this .msgID = msgID;
076: this .replyTo = replyTo;
077: }
078: }
079:
080: public void setMessageDrivenContext(MessageDrivenContext ctx)
081: throws EJBException {
082: messageContext = ctx;
083: }
084:
085: public void ejbCreate() {
086: try {
087: ctx = new InitialContext();
088: QueueConnectionFactory qcf = (QueueConnectionFactory) ctx
089: .lookup("java:comp/env/jms/QCF");
090: qc = qcf.createQueueConnection();
091: } catch (Exception e) {
092: throw new EJBException("ejbCreate failed", e);
093: }
094: }
095:
096: public void ejbTimeout(Timer timer) {
097: log.info("ejbTimeout(), timer: " + timer);
098: ReplyInfo info = (ReplyInfo) timer.getInfo();
099: try {
100: sendReply("ejbTimeout", info.msgID, info.replyTo);
101: } catch (Exception e) {
102: log.error("Failed to send timer msg", e);
103: }
104: }
105:
106: public void ejbRemove() throws EJBException {
107: try {
108: qc.close();
109: log.info("QueueConnection is closed.");
110: } catch (JMSException e) {
111: log.error("Failed to close connection", e);
112: }
113: }
114:
115: public void onMessage(Message message) {
116: try {
117: TextMessage msg = (TextMessage) message;
118: log.info("onMessage() called, msg=" + msg);
119: int msgID = msg.getIntProperty("UNIQUE_ID");
120: Queue replyTo = (Queue) message.getJMSReplyTo();
121: initTimer(msgID, replyTo);
122: sendReply("onMessage", msgID, replyTo);
123: } catch (Exception e) {
124: log.error("onMessage failure", e);
125: }
126: }
127:
128: public void initTimer(int msgID, Queue replyTo) {
129: try {
130: TimerService ts = messageContext.getTimerService();
131: ReplyInfo info = new ReplyInfo(msgID, replyTo);
132: Timer timer = ts.createTimer(timerTimeout, info);
133: log.info("Timer created with a timeout: " + timerTimeout
134: + " and with info: " + msgID + ", handle: "
135: + timer.getHandle());
136: } catch (Exception e) {
137: log.info("Failed to init timer", e);
138: }
139: return;
140:
141: }
142:
143: private void sendReply(String msg, int msgID, Queue dest)
144: throws JMSException {
145: QueueSession qs = null;
146: try {
147: qs = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
148: QueueSender sender = qs.createSender(dest);
149: TextMessage reply = qs.createTextMessage();
150: reply.setText(msg + " : " + msgID);
151: reply.setIntProperty("UNIQUE_ID", msgID);
152: sender.send(reply, DeliveryMode.NON_PERSISTENT,
153: Message.DEFAULT_PRIORITY, 180000);
154: log.info("Message sent");
155: } finally {
156: try {
157: qs.close();
158: log.info("JBossMQ QueueSession Closed");
159: } catch (JMSException e) {
160: log.error("Failed to close queue session", e);
161: }
162: }
163: }
164: }
|