001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.timer.ejb;
023:
024: import java.io.Serializable;
025: import java.util.Date;
026: import javax.ejb.EJBException;
027: import javax.ejb.MessageDrivenBean;
028: import javax.ejb.MessageDrivenContext;
029: import javax.ejb.TimedObject;
030: import javax.ejb.Timer;
031: import javax.ejb.TimerService;
032: import javax.jms.DeliveryMode;
033: import javax.jms.JMSException;
034: import javax.jms.Message;
035: import javax.jms.MessageListener;
036: import javax.jms.Queue;
037: import javax.jms.QueueConnection;
038: import javax.jms.QueueConnectionFactory;
039: import javax.jms.QueueSender;
040: import javax.jms.QueueSession;
041: import javax.jms.Session;
042: import javax.jms.TextMessage;
043: import javax.naming.InitialContext;
044:
045: import org.jboss.logging.Logger;
046:
047: /** An MDB that obtains the TimerService during the ejbCreate callback. A
048: * timer is created in onMessage using this TimerService. It delays for 1
049: * seconds and continues once a second for 10 seconds.
050: *
051: * @ejb.bean name="OnCreateTimerMDB"
052: * description="OnCreateTimerMDB unit test bean"
053: * destination-type="javax.jms.Queue"
054: * acknowledge-mode="Auto-acknowledge"
055: * @ejb.resource-ref res-ref-name="jms/QCF" res-type="javax.jms.QueueConnectionFactory" res-auth="Container"
056: * @jboss.destination-jndi-name name="queue/C"
057: * @jboss.resource-ref res-ref-name="jms/QCF" jndi-name="ConnectionFactory"
058: *
059: * @author Scott.Stark@jboss.org
060: * @version $Revision: 57211 $
061: */
062: public class OnCreateTimerMessageBean implements MessageDrivenBean,
063: MessageListener, TimedObject {
064: private static Logger log = Logger
065: .getLogger(OnCreateTimerMessageBean.class);
066: private MessageDrivenContext messageContext = null;
067: private QueueConnection qc = null;
068: private InitialContext ctx = null;
069: private TimerService ts;
070:
071: static class ReplyInfo implements Serializable {
072: static final long serialVersionUID = -8185591016792811177L;
073: private int msgID;
074: private Queue replyTo;
075: private Date first;
076: private Date last;
077:
078: ReplyInfo(int msgID, Queue replyTo, Date first, Date last) {
079: this .msgID = msgID;
080: this .replyTo = replyTo;
081: this .first = first;
082: this .last = last;
083: }
084:
085: boolean cancel(Date next) {
086: return last.compareTo(next) < 0;
087: }
088:
089: long getElapsed() {
090: return System.currentTimeMillis() - first.getTime();
091: }
092: }
093:
094: public void setMessageDrivenContext(MessageDrivenContext ctx)
095: throws EJBException {
096: messageContext = ctx;
097: }
098:
099: public void ejbCreate() {
100: try {
101: ctx = new InitialContext();
102: QueueConnectionFactory qcf = (QueueConnectionFactory) ctx
103: .lookup("java:comp/env/jms/QCF");
104: qc = qcf.createQueueConnection();
105: ts = messageContext.getTimerService();
106: } catch (Exception e) {
107: log.error("Failed to init timer", e);
108: throw new EJBException("ejbCreate failed", e);
109: }
110: }
111:
112: public void ejbTimeout(Timer timer) {
113: log.info("ejbTimeout(), timer: " + timer);
114: ReplyInfo info = (ReplyInfo) timer.getInfo();
115: Date next = timer.getNextTimeout();
116: if (info.cancel(next)) {
117: log.info("Cancelling timer");
118: timer.cancel();
119: }
120:
121: try {
122: long elapsed = info.getElapsed();
123: sendReply("ejbTimeout", info.msgID, elapsed, info.replyTo);
124: } catch (Exception e) {
125: log.error("Failed to send timer msg", e);
126: }
127: }
128:
129: public void ejbRemove() throws EJBException {
130: try {
131: qc.close();
132: log.info("QueueConnection is closed.");
133: } catch (JMSException e) {
134: log.error("Failed to close connection", e);
135: }
136: }
137:
138: public void onMessage(Message message) {
139: try {
140: TextMessage msg = (TextMessage) message;
141: log.info("onMessage() called, msg=" + msg);
142: int msgID = msg.getIntProperty("UNIQUE_ID");
143: Queue replyTo = (Queue) message.getJMSReplyTo();
144: sendReply("onMessage", msgID, 0, replyTo);
145: // Start the reply timer
146: this .initTimer(msgID, replyTo);
147: } catch (Exception e) {
148: log.error("onMessage failure", e);
149: }
150: }
151:
152: public void initTimer(int msgID, Queue replyTo) {
153: try {
154: Date first = new Date(System.currentTimeMillis() + 1000);
155: Date last = new Date(System.currentTimeMillis() + 11000);
156: ReplyInfo info = new ReplyInfo(msgID, replyTo, first, last);
157: Timer timer = ts.createTimer(first, 1000, info);
158: log.info("Timer created with a timeout: " + first
159: + " and with info: " + msgID + ", handle: "
160: + timer.getHandle());
161: } catch (Exception e) {
162: log.info("Failed to init timer", e);
163: }
164: return;
165: }
166:
167: private void sendReply(String msg, int msgID, long elapsed,
168: Queue dest) throws JMSException {
169: QueueSession qs = null;
170: try {
171: qs = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
172: QueueSender sender = qs.createSender(dest);
173: TextMessage reply = qs.createTextMessage();
174: reply.setText(msg + " : " + msgID);
175: reply.setIntProperty("UNIQUE_ID", msgID);
176: reply.setLongProperty("Elapsed", elapsed);
177: sender.send(reply, DeliveryMode.NON_PERSISTENT,
178: Message.DEFAULT_PRIORITY, 180000);
179: log.info("Message sent");
180: } finally {
181: try {
182: qs.close();
183: log.info("JBossMQ QueueSession Closed");
184: } catch (JMSException e) {
185: log.error("Failed to close queue session", e);
186: }
187: }
188: }
189: }
|