001: /**
002: * JOnAS: Java(TM) Open Application Server
003: * Copyright (C) 1999-2004 Bull S.A.
004: * Contact: jonas-team@objectweb.org
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2.1 of the License, or any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
019: * USA
020: *
021: * --------------------------------------------------------------------------
022: * $Id: JMdbFactory.java 9860 2006-11-23 16:40:22Z durieuxp $
023: * --------------------------------------------------------------------------
024: */package org.objectweb.jonas_ejb.container;
025:
026: import java.lang.reflect.Method;
027: import java.util.ArrayList;
028: import java.util.List;
029: import java.util.ListIterator;
030:
031: import javax.ejb.EJBException;
032: import javax.ejb.MessageDrivenBean;
033: import javax.ejb.MessageDrivenContext;
034: import javax.ejb.Timer;
035: import javax.ejb.TimerService;
036: import javax.jms.ConnectionConsumer;
037: import javax.jms.JMSException;
038: import javax.jms.MessageListener;
039: import javax.jms.Queue;
040: import javax.jms.ServerSession;
041: import javax.jms.ServerSessionPool;
042: import javax.jms.Session;
043: import javax.jms.Topic;
044: import javax.jms.XAQueueConnection;
045: import javax.jms.XAQueueConnectionFactory;
046: import javax.jms.XATopicConnection;
047: import javax.jms.XATopicConnectionFactory;
048: import javax.naming.Context;
049: import javax.transaction.Transaction;
050:
051: import org.objectweb.jonas_ejb.deployment.api.MessageDrivenDesc;
052: import org.objectweb.jonas_ejb.deployment.api.MethodDesc;
053:
054: import org.objectweb.jonas_jms.api.JmsManager;
055:
056: import org.objectweb.util.monolog.api.BasicLevel;
057:
058: /**
059: * This class is a factory for a Message Driven Bean
060: * There is one such class per MDB class.
061: * Contains all information related to the bean and set up all JMS environment for the bean
062: * It manages a ServerSession pool to server MDB requests.
063: * @author Philippe Coq, Philippe Durieux
064: */
065: public class JMdbFactory extends JFactory implements ServerSessionPool {
066:
067: /**
068: * JMS Manager
069: */
070: private JmsManager jms = null;
071:
072: /**
073: * Connection Consumer for this message driven bean
074: */
075: private ConnectionConsumer cc = null;
076:
077: /**
078: * pool of ServerSession objects
079: */
080: private List sspool = new ArrayList();
081:
082: /**
083: * number of instances in the pool
084: */
085: private int instanceCount = 0;
086:
087: /**
088: * initial value for pool size
089: */
090: private int minPoolSize = 0;
091:
092: /**
093: * nb max of instances in pool
094: */
095: private int maxCacheSize = 0;
096:
097: /**
098: * JMS Topic Connection
099: * always use XA Connections for transactions.
100: */
101: private XATopicConnection tconn = null;
102:
103: /**
104: * JMS Queue Connection (Topic or Queue)
105: * always use XA Connections for transactions.
106: */
107: private XAQueueConnection qconn = null;
108:
109: /**
110: * Constructor
111: * @param dd Message Driven Descriptor
112: * @param cont Container where this bean is defined
113: */
114: public JMdbFactory(MessageDrivenDesc dd, JContainer cont) {
115: super (dd, cont);
116:
117: // Check if tx managed by the bean or the container
118: txbeanmanaged = dd.isBeanManagedTransaction();
119:
120: // Check that JMS Service has been run in the server
121: jms = cont.getJmsManager();
122: if (jms == null) {
123: TraceEjb.logger
124: .log(BasicLevel.ERROR,
125: "cannot deploy a message driven bean without the JMS Service");
126: throw new EJBException("JMS Service must be run");
127: }
128:
129: // Create a Connection Consumer, depending on Deployment Descriptor
130: String selector = dd.getSelector();
131: String dest = dd.getDestinationJndiName();
132:
133: // Get the number max of messages sent on a Session at one time.
134: // There is a bug in Joram: If the number is greater than 1, Joram
135: // will wait until this nb is reached!
136: // We set the number max to 1 to work around this bug.
137: // LATER: maxMessage = dd.getMaxMessages(), with this value configurable.
138: int maxMessages = 1;
139:
140: if (dest == null) {
141: throw new EJBException(
142: "The destination JNDI name is null in bean "
143: + dd.getEjbName());
144: }
145:
146: try {
147: if (dd.isTopicDestination()) {
148: // topic
149: XATopicConnectionFactory tcf = jms
150: .getXATopicConnectionFactory();
151: tconn = tcf.createXATopicConnection();
152: Topic t = jms.getTopic(dest);
153: if (dd.isSubscriptionDurable()) {
154: if (TraceEjb.isDebugJms()) {
155: TraceEjb.mdb.log(BasicLevel.DEBUG,
156: "createDurableConnectionConsumer for "
157: + ejbname);
158: }
159: cc = tconn.createDurableConnectionConsumer(t,
160: ejbname, selector, this , maxMessages);
161: } else {
162: if (TraceEjb.isDebugJms()) {
163: TraceEjb.mdb.log(BasicLevel.DEBUG,
164: "createConnectionConsumer for " + dest);
165: }
166: cc = tconn.createConnectionConsumer(t, selector,
167: this , maxMessages);
168: }
169: tconn.start();
170: } else {
171: // queue
172: XAQueueConnectionFactory qcf = jms
173: .getXAQueueConnectionFactory();
174: qconn = qcf.createXAQueueConnection();
175: Queue q = jms.getQueue(dest);
176: if (TraceEjb.isDebugJms()) {
177: TraceEjb.mdb.log(BasicLevel.DEBUG,
178: "createConnectionConsumer for " + dest);
179: }
180: cc = qconn.createConnectionConsumer(q, selector, this ,
181: maxMessages);
182: qconn.start();
183: }
184: } catch (Exception e) {
185: throw new EJBException(
186: "Cannot create connection consumer in bean "
187: + dd.getEjbName() + " :", e);
188: }
189:
190: minPoolSize = dd.getPoolMin();
191: maxCacheSize = dd.getCacheMax();
192: if (TraceEjb.isDebugSwapper()) {
193: TraceEjb.swapper.log(BasicLevel.DEBUG, " maxCacheSize = "
194: + maxCacheSize + " minPoolSize = " + minPoolSize);
195: }
196: }
197:
198: // ---------------------------------------------------------------
199: // Specific BeanFactory implementation
200: // ---------------------------------------------------------------
201:
202: /**
203: * Init pool of instances
204: */
205: public void initInstancePool() {
206: if (minPoolSize != 0) {
207: TraceEjb.mdb.log(BasicLevel.INFO, "pre-allocate a set of "
208: + minPoolSize + " message driven bean instances");
209: // pre-allocate a set of ServerSession
210: synchronized (sspool) {
211: for (int i = 0; i < minPoolSize; i++) {
212: ServerSession ss = null;
213: try {
214: ss = createNewInstance();
215: sspool.add(ss);
216: } catch (Exception e) {
217: TraceEjb.mdb.log(BasicLevel.ERROR,
218: "cannot init pool of instances ");
219: throw new EJBException(
220: "cannot init pool of instances ", e);
221: }
222: }
223: }
224: }
225: }
226:
227: /**
228: * @return the size of the ServerSessionPool
229: */
230: public int getPoolSize() {
231: return sspool.size();
232: }
233:
234: /**
235: * stop this EJB.
236: * call ejbRemove on all MDB
237: * close the connection consumer
238: * Stop the threads and remove the beans
239: */
240: public void stop() {
241: if (TraceEjb.isDebugJms()) {
242: TraceEjb.mdb.log(BasicLevel.DEBUG, "");
243: }
244: try {
245: cc.close();
246: if (tconn != null) {
247: tconn.close();
248: }
249: if (qconn != null) {
250: qconn.close();
251: }
252: } catch (JMSException e) {
253: TraceEjb.logger.log(BasicLevel.WARN,
254: "unregister: Cannot close Connection Consumer");
255: }
256: stopContainer();
257: }
258:
259: /**
260: * synchronize bean instances if needed
261: */
262: public void syncDirty(boolean notused) {
263: }
264:
265: /**
266: * @return the home if exist
267: */
268: public JHome getHome() {
269: return null;
270: }
271:
272: /**
273: * @return the local home if exist
274: */
275: public JLocalHome getLocalHome() {
276: return null;
277: }
278:
279: // ---------------------------------------------------------------
280: // ServerSessionPool Implementation
281: // ---------------------------------------------------------------
282:
283: /**
284: * Returns a server session from the pool. If pool is empty, creates a new one.
285: *
286: * @return Returns a server session from the pool.
287: * @exception JMSException - if an application server fails to return a Server Session
288: * out of its server session pool.
289: */
290: public ServerSession getServerSession() throws JMSException {
291: if (TraceEjb.isDebugJms()) {
292: TraceEjb.mdb.log(BasicLevel.DEBUG, "");
293: }
294:
295: return getNewInstance(true);
296: }
297:
298: // ---------------------------------------------------------------
299: // Other methods
300: // ---------------------------------------------------------------
301:
302: /**
303: * put the ServerSession back to the pool
304: * @param ss The ServerSession
305: */
306: public void releaseServerSession(ServerSession ss) {
307: if (TraceEjb.isDebugJms()) {
308: TraceEjb.mdb.log(BasicLevel.DEBUG, "");
309: }
310:
311: synchronized (sspool) {
312: sspool.add(ss);
313: if (TraceEjb.isDebugSwapper()) {
314: TraceEjb.swapper.log(BasicLevel.DEBUG, "notifyAll ");
315: }
316: sspool.notifyAll();
317: }
318: if (TraceEjb.isDebugJms()) {
319: TraceEjb.mdb.log(BasicLevel.DEBUG, "nb instances "
320: + getCacheSize());
321: }
322:
323: }
324:
325: // ---------------------------------------------------------------
326: // other public methods
327: // ---------------------------------------------------------------
328:
329: /**
330: * Obtains the TimerService associated for this Bean
331: * @return a JTimerService instance.
332: */
333: public TimerService getTimerService() {
334: if (myTimerService == null) {
335: // TODO : Check that instance implements TimedObject ?
336: myTimerService = new JTimerService(this );
337: }
338: return myTimerService;
339: }
340:
341: /**
342: * @return min pool size
343: * for Jmx
344: */
345: public int getMinPoolSize() {
346: return minPoolSize;
347: }
348:
349: /**
350: * @return max cache size
351: * for Jmx
352: */
353: public int getMaxCacheSize() {
354: return maxCacheSize;
355: }
356:
357: /**
358: * @return current cache size ( = nb of instance created)
359: * for Jmx
360: */
361: public int getCacheSize() {
362: return instanceCount;
363: }
364:
365: /**
366: * @return the Transaction Attribute
367: */
368: public int getTransactionAttribute() {
369: return ((MessageDrivenDesc) dd).getTxAttribute();
370: }
371:
372: /**
373: * For Message Driven Beans, only 2 cases are possible.
374: * @param rctx The Request Context
375: */
376: public void checkTransaction(RequestCtx rctx) {
377: if (rctx.txAttr == MethodDesc.TX_REQUIRED) {
378: try {
379: if (tm.getTransaction() != null) {
380: // This should not occur (DEBUG)
381: TraceEjb.logger
382: .log(BasicLevel.ERROR,
383: "Transaction already opened by this thread.");
384: TraceEjb.logger.log(BasicLevel.ERROR,
385: "Transaction status = " + tm.getStatus());
386: TraceEjb.logger.log(BasicLevel.ERROR,
387: "Transaction = " + tm.getTransaction());
388: Thread.dumpStack();
389: return;
390: }
391: tm.begin();
392: rctx.mustCommit = true;
393: rctx.currTx = tm.getTransaction();
394: if (TraceEjb.isDebugTx()) {
395: TraceEjb.tx.log(BasicLevel.DEBUG,
396: "Transaction started: " + rctx.currTx);
397: }
398: } catch (Exception e) {
399: // No exception raised in case of MDB
400: TraceEjb.logger.log(BasicLevel.ERROR,
401: "cannot start tx", e);
402: return;
403: }
404: }
405: }
406:
407: /**
408: * Reduce number of instances in memory in the free list
409: * we reduce to the minPoolSize
410: */
411: public void reduceCache() {
412: // reduce the pool to the minPoolSize
413: int poolsz = minPoolSize;
414: synchronized (sspool) {
415: if (TraceEjb.isDebugSwapper()) {
416: TraceEjb.swapper.log(BasicLevel.DEBUG, "try to reduce "
417: + sspool.size() + " to " + poolsz);
418: }
419: while (sspool.size() > poolsz) {
420: ListIterator i = sspool.listIterator();
421: if (i.hasNext()) {
422: i.next();
423: i.remove();
424: instanceCount--;
425: }
426: }
427: }
428: if (TraceEjb.isDebugSwapper()) {
429: TraceEjb.swapper.log(BasicLevel.DEBUG, "cacheSize= "
430: + getCacheSize());
431: }
432: }
433:
434: /**
435: * Notify a timeout for this bean
436: * @param timer timer whose expiration caused this notification.
437: */
438: public void notifyTimeout(Timer timer) {
439: if (stopped) {
440: TraceEjb.mdb.log(BasicLevel.WARN, "Container stopped");
441: return;
442: }
443: if (TraceEjb.isDebugJms()) {
444: TraceEjb.mdb.log(BasicLevel.DEBUG, "");
445: }
446:
447: // We need an instance from the pool to process the timeout.
448: JMessageDrivenBean jmdb = null;
449: jmdb = getNewInstance(false);
450:
451: // deliver the timeout to the bean
452: jmdb.deliverTimeout(timer);
453:
454: // release the instance
455: releaseServerSession(jmdb);
456: }
457:
458: // ---------------------------------------------------------------
459: // private methods
460: // ---------------------------------------------------------------
461:
462: /**
463: * Returns a new instance of the bean. Try to get one from the pool,
464: * and create a new one if the pool is empty.
465: * @param canwait true if can wait if max-cache-size reached.
466: * @return Returns a JMessageDrivenBean instance from the pool.
467: */
468: private JMessageDrivenBean getNewInstance(boolean canwait) {
469: if (TraceEjb.isDebugJms()) {
470: TraceEjb.mdb.log(BasicLevel.DEBUG, "");
471: }
472:
473: // try to get one from the Pool
474: JMessageDrivenBean ss = null;
475:
476: // try to find a free context in the pool
477: synchronized (sspool) {
478: if (!sspool.isEmpty()) {
479: try {
480: ss = (JMessageDrivenBean) sspool.remove(0);
481: return ss;
482: } catch (IndexOutOfBoundsException ex) {
483: // This should never happen
484: TraceEjb.logger.log(BasicLevel.ERROR, "exception:"
485: + ex);
486: throw new EJBException(
487: "Cannot get an instance from the pool", ex);
488: }
489: } else {
490: if (TraceEjb.isDebugJms()) {
491: TraceEjb.mdb.log(BasicLevel.DEBUG, "pool is empty");
492: }
493: if (maxCacheSize == 0 || instanceCount < maxCacheSize
494: || !canwait) {
495: // Pool is empty creates the ServerSession object
496: try {
497: ss = createNewInstance();
498: } catch (Exception e) {
499: TraceEjb.logger.log(BasicLevel.ERROR,
500: "exception:" + e);
501: throw new EJBException(
502: "Cannot create a new instance", e);
503: }
504: } else {
505: while (sspool.isEmpty()) {
506: if (TraceEjb.isDebugSwapper()) {
507: TraceEjb.swapper
508: .log(BasicLevel.DEBUG,
509: "sspool.isEmpty() = true --> wait()");
510: }
511: try {
512: sspool.wait();
513: if (TraceEjb.isDebugSwapper()) {
514: TraceEjb.swapper.log(BasicLevel.DEBUG,
515: "sspool notified");
516: }
517: } catch (InterruptedException e) {
518: if (TraceEjb.isDebugSwapper()) {
519: TraceEjb.swapper
520: .log(
521: BasicLevel.DEBUG,
522: "sspool waiting interrupted",
523: e);
524: }
525: } catch (Exception e) {
526: throw new EJBException(
527: "synchronization pb", e);
528: }
529: }
530: try {
531: ListIterator i = sspool.listIterator();
532: if (i.hasNext()) {
533: ss = (JMessageDrivenBean) i.next();
534: i.remove();
535: }
536: return ss;
537: } catch (IndexOutOfBoundsException ex) {
538: // pool is empty
539: }
540: }
541:
542: }
543: if (TraceEjb.isDebugSwapper()) {
544: TraceEjb.swapper.log(BasicLevel.DEBUG, "nb instances "
545: + getCacheSize());
546: }
547: return ss;
548: }
549: }
550:
551: /**
552: * Create a new instance of the bean
553: * @throws Exception if MDB instanciation is not possible
554: * @return Returns a new JMessageDrivenBean instance.
555: */
556: private JMessageDrivenBean createNewInstance() throws Exception {
557: if (TraceEjb.isDebugJms()) {
558: TraceEjb.mdb.log(BasicLevel.DEBUG, "");
559: }
560: Session sess = null;
561: JMessageDrivenBean ss = null;
562: MessageDrivenDesc mdd = (MessageDrivenDesc) dd;
563: if (tconn != null) {
564: if (mdd.isRequired()) {
565: sess = tconn.createXATopicSession();
566: } else {
567: sess = tconn.createTopicSession(false, mdd
568: .getAcknowledgeMode());
569: }
570: } else if (qconn != null) {
571: if (mdd.isRequired()) {
572: sess = qconn.createXAQueueSession();
573: } else {
574: sess = qconn.createQueueSession(false, mdd
575: .getAcknowledgeMode());
576: }
577: } else {
578: TraceEjb.mdb.log(BasicLevel.ERROR,
579: "connection not initialized");
580: throw new Exception("JMS connection not initialized");
581: }
582:
583: // Set ContextClassLoader with the ejbclassloader.
584: // This is necessary in case ejbCreate calls another bean in the same jar.
585: ClassLoader old = Thread.currentThread()
586: .getContextClassLoader();
587: Thread.currentThread().setContextClassLoader(myClassLoader());
588:
589: // Creates the new instance
590: MessageDrivenBean mdb = null;
591: try {
592: mdb = (MessageDrivenBean) beanclass.newInstance();
593: } catch (Exception e) {
594: TraceEjb.logger.log(BasicLevel.ERROR,
595: "failed to create instance:", e);
596: resetToOldClassLoader(old);
597: throw new EJBException(
598: "Container failed to create instance of Message Driven Bean",
599: e);
600: }
601:
602: // Instanciates a new JMessageDrivenBean object
603: // and set it as the MessageListener for this Session.
604: ss = new JMessageDrivenBean(this , sess, mdb, wm);
605: try {
606: sess.setMessageListener((MessageListener) ss);
607: } catch (JMSException je) {
608: resetToOldClassLoader(old);
609: throw je;
610: }
611:
612: // starts the bean instance: setMessageDrivenContext() + ejbCreate()
613: // see EJB spec. 2.0 page 322.
614: // Both operations must be called with the correct ComponentContext
615: Context ctxsave = setComponentContext();
616: mdb.setMessageDrivenContext((MessageDrivenContext) ss);
617: try {
618: Method m = beanclass.getMethod("ejbCreate", (Class[]) null);
619: boolean bm = m.isAccessible();
620: if (!bm) {
621: m.setAccessible(true);
622: }
623: m.invoke(mdb, (Object[]) null);
624: m.setAccessible(bm);
625: } catch (Exception e) {
626: TraceEjb.logger
627: .log(
628: BasicLevel.ERROR,
629: "cannot call ejbCreate on message driven bean instance ",
630: e);
631: throw new EJBException(
632: " Container fails to call ejbCreate on message driven bean instance",
633: e);
634: } finally {
635: resetToOldClassLoader(old);
636: resetComponentContext(ctxsave);
637: }
638:
639: synchronized (sspool) {
640: instanceCount++;
641: }
642: return ss;
643: }
644:
645: /**
646: * Reset currentThread context ClassLoader to a given ClassLoader
647: * @param old Old ClassLoader to reuse
648: */
649: private void resetToOldClassLoader(ClassLoader old) {
650: if (old != null) {
651: Thread.currentThread().setContextClassLoader(old);
652: }
653: }
654:
655: /*
656: * Make sense only for entities
657: */
658: public void storeInstances(Transaction tx) {
659: // unused
660: }
661: }
|