001: /*
002: * JOnAS: Java(TM) Open Application Server
003: * Copyright (C) 1999 Bull S.A.
004: * Contact: jonas-team@objectweb.org
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2.1 of the License, or any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
019: * USA
020: *
021: * --------------------------------------------------------------------------
022: * $Id: SenderSF.java 5995 2004-12-17 15:08:36Z joaninh $
023: * --------------------------------------------------------------------------
024: */
025:
026: // SenderSF.java
027: // Stateful Session Bean
028: package org.objectweb.jonas.jtests.beans.message;
029:
030: import java.rmi.RemoteException;
031: import java.util.Collection;
032: import java.util.Iterator;
033:
034: import javax.ejb.CreateException;
035: import javax.ejb.EJBException;
036: import javax.ejb.FinderException;
037: import javax.ejb.SessionBean;
038: import javax.ejb.SessionContext;
039: import javax.jms.JMSException;
040: import javax.jms.MapMessage;
041: import javax.jms.Queue;
042: import javax.jms.QueueConnection;
043: import javax.jms.QueueConnectionFactory;
044: import javax.jms.QueueSender;
045: import javax.jms.QueueSession;
046: import javax.jms.Topic;
047: import javax.jms.TopicConnection;
048: import javax.jms.TopicConnectionFactory;
049: import javax.jms.TopicPublisher;
050: import javax.jms.TopicSession;
051: import javax.naming.InitialContext;
052: import javax.naming.NamingException;
053: import javax.rmi.PortableRemoteObject;
054:
055: import org.objectweb.jonas.common.Log;
056: import org.objectweb.jonas.jtests.util.Env;
057: import org.objectweb.util.monolog.api.BasicLevel;
058: import org.objectweb.util.monolog.api.Logger;
059:
060: /**
061: *
062: */
063: public class SenderSF implements SessionBean {
064:
065: static protected Logger logger = null;
066: SessionContext ejbContext;
067: private transient InitialContext ictx = null;
068: private transient TopicConnectionFactory tcf = null;
069: private transient TopicConnection tc = null;
070: private transient QueueConnectionFactory qcf = null;
071: private transient QueueConnection qc = null;
072: private transient MRecordHome ehome = null;
073: private static int count = 1;
074:
075: // ------------------------------------------------------------------
076: // SessionBean implementation
077: // ------------------------------------------------------------------
078:
079: /**
080: * Set the associated session context. The container calls this method
081: * after the instance creation.
082: * The enterprise Bean instance should store the reference to the context
083: * object in an instance variable.
084: * This method is called with no transaction context.
085: *
086: * @param sessionContext A SessionContext interface for the instance.
087: * @throws EJBException Thrown by the method to indicate a failure caused by
088: * a system-level error.
089: */
090: public void setSessionContext(SessionContext ctx) {
091: if (logger == null)
092: logger = Log.getLogger(Log.JONAS_TESTS_PREFIX);
093: logger.log(BasicLevel.DEBUG, "");
094: ejbContext = ctx;
095: }
096:
097: /**
098: * A container invokes this method before it ends the life of the session object.
099: * This happens as a result of a client's invoking a remove operation, or when a
100: * container decides to terminate the session object after a timeout.
101: * This method is called with no transaction context.
102: *
103: * @throws EJBException Thrown by the method to indicate a failure caused by
104: * a system-level error.
105: */
106: public void ejbRemove() {
107: logger.log(BasicLevel.DEBUG, "");
108: try {
109: tc.close();
110: qc.close();
111: } catch (Exception e) {
112: logger.log(BasicLevel.ERROR, "Exception on close:" + e);
113: }
114: }
115:
116: /**
117: * The Session bean must define 1 or more ejbCreate methods.
118: *
119: * @throws CreateException Failure to create a session EJB object.
120: */
121: public void ejbCreate() throws CreateException {
122: logger.log(BasicLevel.DEBUG, "");
123: // Lookup Connection Factories
124: try {
125: ictx = new InitialContext();
126: tcf = (TopicConnectionFactory) ictx.lookup("TCF");
127: qcf = (QueueConnectionFactory) ictx.lookup("QCF");
128: } catch (NamingException e) {
129: logger.log(BasicLevel.ERROR,
130: "SenderSF : Cannot lookup Connection Factories: "
131: + e);
132: throw new CreateException(
133: "SenderSF: Cannot lookup Connection Factories");
134: }
135:
136: // Create Connections
137: try {
138: qc = qcf.createQueueConnection();
139: tc = tcf.createTopicConnection();
140: } catch (JMSException e) {
141: logger.log(BasicLevel.ERROR,
142: "SenderSF : Cannot create connections: " + e);
143: throw new CreateException(
144: "SenderSF: Cannot create connections");
145: }
146:
147: // Lookup Entity Home for checking
148: String BEAN_HOME = "messageMRecordECHome";
149: try {
150: ehome = (MRecordHome) PortableRemoteObject.narrow(ictx
151: .lookup(BEAN_HOME), MRecordHome.class);
152: } catch (NamingException e) {
153: logger.log(BasicLevel.ERROR,
154: "SenderSF ejbCreate: Cannot get entity home: " + e);
155: throw new CreateException(
156: "SenderSF: Cannot get entity home");
157: }
158: }
159:
160: /**
161: * A container invokes this method on an instance before the instance
162: * becomes disassociated with a specific EJB object.
163: */
164: public void ejbPassivate() {
165: logger.log(BasicLevel.DEBUG, "");
166: }
167:
168: /**
169: * A container invokes this method when the instance is taken out of
170: * the pool of available instances to become associated with a specific
171: * EJB object.
172: */
173: public void ejbActivate() {
174: logger.log(BasicLevel.DEBUG, "");
175: }
176:
177: // ------------------------------------------------------------------
178: // private methods
179: // ------------------------------------------------------------------
180:
181: /**
182: * return a unique identifier
183: */
184: private String getUUID() {
185: long uuid;
186: synchronized (getClass()) {
187: uuid = System.currentTimeMillis() * 256 + count;
188: count++;
189: }
190: return String.valueOf(uuid);
191: }
192:
193: // ------------------------------------------------------------------
194: // Sender implementation
195: // ------------------------------------------------------------------
196:
197: /**
198: * send a message on topic
199: * @param String destination
200: * @param int value set in message
201: * @param int nb of messages sent
202: */
203: public void sendOnTopic(String dest, int val, int nb) {
204: logger.log(BasicLevel.DEBUG, "");
205: // Lookup destinations
206: Topic topic = null;
207: try {
208: topic = (Topic) ictx.lookup(dest);
209: } catch (NamingException e) {
210: throw new EJBException("sendOnTopic: Cannot lookup " + dest);
211: }
212:
213: // Create TopicSession
214: // Create Session at each request : Avoids the bug in JMS
215: // about Session not enlisted in transactions if open first.
216: TopicSession ss = null;
217: try {
218: // (true, 0) are the recommanded args, although they are not taken
219: // in account by the container.
220: ss = tc.createTopicSession(true, 0);
221: } catch (JMSException e) {
222: throw new EJBException("Cannot create Session: " + e);
223: }
224:
225: // Create the TopicPublisher
226: TopicPublisher publisher = null;
227: try {
228: publisher = ss.createPublisher(topic);
229: } catch (JMSException e) {
230: throw new EJBException("Cannot create TopicPublisher: " + e);
231: }
232:
233: // Publish messages on the topic
234: try {
235: for (int i = 0; i < nb; i++) {
236: MapMessage mess = ss.createMapMessage();
237: mess.setString("Id", getUUID());
238: mess.setString("Text", dest);
239: mess.setInt("Value", val);
240: publisher.publish(mess);
241: }
242: } catch (JMSException e) {
243: throw new EJBException("Cannot send message: " + e);
244: }
245:
246: // Close Session: This is mandatory for the correct behaviour of
247: // XA protocol. An XA END must be sent before commit or rollback.
248: try {
249: ss.close();
250: } catch (JMSException e) {
251: throw new EJBException("Cannot close session: " + e);
252: }
253: }
254:
255: /**
256: * send messages on topic (transacted)
257: * @param String destination
258: * @param int value set in message
259: * @param int nb of messages sent
260: */
261: public void sendOnTopicTx(String dest, int val, int nb) {
262: sendOnTopic(dest, val, nb);
263: }
264:
265: /**
266: * send a message on queue
267: * @param String destination
268: * @param int value set in message
269: * @param int nb of messages sent
270: */
271: public void sendOnQueue(String dest, int val, int nb) {
272: logger.log(BasicLevel.DEBUG, "");
273: // Lookup destination
274: Queue queue = null;
275: try {
276: queue = (Queue) ictx.lookup(dest);
277: } catch (NamingException e) {
278: throw new EJBException("sendOnQueue: Cannot lookup " + dest);
279: }
280:
281: // Create QueueSession
282: // Must create Session at each request because of the bug in JMS
283: // about Session not enlisted in transactions if open first.
284: QueueSession ss = null;
285: try {
286: // (true, 0) are the recommanded args, although they are not taken
287: // in account by the container.
288: ss = qc.createQueueSession(true, 0);
289: } catch (JMSException e) {
290: throw new EJBException("Cannot create Session: " + e);
291: }
292:
293: // Create the QueueSender
294: QueueSender sender = null;
295: try {
296: sender = ss.createSender(queue);
297: } catch (JMSException e) {
298: throw new EJBException("Cannot create QueueSender: " + e);
299: }
300:
301: // Publish messages on the queue
302: try {
303: for (int i = 0; i < nb; i++) {
304: MapMessage mess = ss.createMapMessage();
305: mess.setString("Id", getUUID());
306: mess.setString("Text", dest);
307: mess.setInt("Value", val);
308: sender.send(mess);
309: }
310: } catch (JMSException e) {
311: throw new EJBException("Cannot send message: " + e);
312: }
313:
314: // Close Session: This is mandatory for the correct behaviour of
315: // XA protocol. An XA END must be sent before commit or rollback.
316: try {
317: ss.close();
318: } catch (JMSException e) {
319: throw new EJBException("Cannot close session: " + e);
320: }
321: }
322:
323: /**
324: * send messages on queue (transacted)
325: * @param String destination
326: * @param int value set in message
327: * @param int nb of messages sent
328: */
329: public void sendOnQueueTx(String dest, int val, int nb) {
330: sendOnQueue(dest, val, nb);
331: }
332:
333: /**
334: * Checking send methods
335: * @param int value looked in messages received
336: * @param int nb of messages that could be received
337: * @param int nb of seconds max to wait for all messages
338: * @return actual nb of messages received
339: */
340: public int check(int val, int nb, int sec) {
341: Collection elist = null;
342: int retval = 0;
343: for (int i = 0; i <= sec; i++) {
344: logger.log(BasicLevel.DEBUG, "sec : " + i + "/" + sec);
345: try {
346: elist = ehome.findByValue(val);
347: retval = elist.size();
348: if (retval >= nb) {
349: // clean database before returning
350: Iterator it = elist.iterator();
351: while (it.hasNext()) {
352: MRecord ent = (MRecord) PortableRemoteObject
353: .narrow(it.next(), MRecord.class);
354: try {
355: ent.remove();
356: } catch (Exception e) {
357: throw new EJBException("Error on remove");
358: }
359: }
360: return retval;
361: }
362: } catch (FinderException e) {
363: } catch (RemoteException e) {
364: return retval;
365: }
366: try {
367: Thread.sleep(1000);
368: } catch (InterruptedException e) {
369: }
370: }
371: return retval;
372: }
373:
374: /**
375: * Clean all entity beans for this value
376: */
377: public void clean(int val) {
378: logger.log(BasicLevel.DEBUG, "");
379: Collection elist = null;
380: try {
381: elist = ehome.findByValue(val);
382: } catch (FinderException e) {
383: return;
384: } catch (Exception e) {
385: throw new EJBException("Error on find");
386: }
387: Iterator it = elist.iterator();
388: while (it.hasNext()) {
389: MRecord ent = (MRecord) PortableRemoteObject.narrow(it
390: .next(), MRecord.class);
391: try {
392: ent.remove();
393: } catch (Exception e) {
394: throw new EJBException("Error on remove");
395: }
396: }
397: }
398:
399: }
|