001: /*
002: * JOnAS: Java(TM) Open Application Server
003: * Copyright (C) 1999 Bull S.A.
004: * Contact: jonas-team@objectweb.org
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2.1 of the License, or any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
019: * USA
020: *
021: * Initial developer(s):
022: * Contributor(s):
023: * Christophe Ney: for making easier Enhydra integration
024: * Philippe Durieux
025: * Jeff Mesnil: for JORAM 3.0 integration
026: * --------------------------------------------------------------------------
027: * $Id: JmsManagerImpl.java 5560 2004-10-06 14:41:54Z benoitf $
028: * --------------------------------------------------------------------------
029: */
030:
031: package org.objectweb.jonas_jms;
032:
033: import java.util.Enumeration;
034: import java.util.Hashtable;
035: import java.util.Vector;
036:
037: import javax.jms.ConnectionFactory;
038: import javax.jms.JMSException;
039: import javax.jms.Queue;
040: import javax.jms.QueueConnectionFactory;
041: import javax.jms.Topic;
042: import javax.jms.TopicConnectionFactory;
043: import javax.jms.XAConnectionFactory;
044: import javax.jms.XAQueueConnectionFactory;
045: import javax.jms.XATopicConnectionFactory;
046: import javax.naming.InitialContext;
047: import javax.naming.NamingException;
048:
049: import org.objectweb.jonas_jms.api.JmsAdministration;
050: import org.objectweb.jonas_jms.api.JmsManager;
051: import org.objectweb.transaction.jta.TransactionManager;
052: import org.objectweb.util.monolog.api.BasicLevel;
053:
054: /**
055: * JmsManager implementation
056: * This singleton class must exist in each JOnAS server that want to use JMS
057: * @author Laurent Chauvirey, Frederic Maistre, Nicolas Tachker
058: * Contributor(s): <p>
059: * Christophe Ney: for making easier Enhydra integration <p>
060: * Philippe Durieux <p>
061: * Jeff Mesnil: for JORAM 3.0 integration <p>
062: * Philippe Coq: for JORAM 3.1 (JMS 1.1) integration <p>
063: * Adriana Danes : update with support of JMS resource monitoring
064: */
065: public class JmsManagerImpl implements JmsManager, JmsJmxManagement {
066:
067: private JmsAdministration momadmin = null;
068: private InitialContext ictx = null;
069: private ConnectionFactory cf = null;
070: private TopicConnectionFactory tcf = null;
071: private QueueConnectionFactory qcf = null;
072: private Hashtable queues = new Hashtable();
073: private Hashtable topics = new Hashtable();
074: private Vector namelist = new Vector();
075:
076: private static TransactionManager tm = null;
077: private static JmsManagerImpl unique = null;
078:
079: /**
080: * Private constructor to force only 1 instance.
081: */
082: private JmsManagerImpl() {
083: TraceJms.logger.log(BasicLevel.DEBUG, "");
084: }
085:
086: /**
087: * Get the JmsManager.
088: */
089: public static JmsManager getJmsManager() {
090: if (unique == null) {
091: unique = new JmsManagerImpl();
092: }
093: return (JmsManager) unique;
094: }
095:
096: /**
097: * Get the JmsJmxManagement.
098: */
099: public static JmsJmxManagement getJmsJmxManagement() {
100: if (unique == null) {
101: unique = new JmsManagerImpl();
102: }
103: return (JmsJmxManagement) unique;
104: }
105:
106: // -------------------------------------------------------------------
107: // Internal Methods
108: // -------------------------------------------------------------------
109:
110: /**
111: * return the Transaction Manager
112: */
113: public static TransactionManager getTransactionManager() {
114: return tm;
115: }
116:
117: // -------------------------------------------------------------------
118: // JmsManager Implementation
119: // -------------------------------------------------------------------
120:
121: /**
122: * Initialization of the JmsManager
123: * @param class JmsAdministration class .
124: * @param boolean true for launchin the MOM inside the JOnAS server.
125: * @param String url connexion that must be used as soon as we don't use
126: * the default value of a3server.xml
127: */
128: public void init(Class cl, boolean collocated, String url,
129: TransactionManager trm) throws Exception {
130: TraceJms.logger.log(BasicLevel.DEBUG, "");
131: tm = trm;
132: int maxloops = collocated ? 1 : 5;
133: for (int i = 0; i < maxloops; i++) {
134: try {
135: // Create an InitialContext
136: ictx = new InitialContext();
137: // Create the MOM instance and start it
138: momadmin = (JmsAdministration) cl.newInstance();
139: momadmin.start(collocated, url);
140: break;
141: } catch (NamingException e) {
142: TraceJms.logger.log(BasicLevel.ERROR,
143: "cannot get Initial Context", e);
144: throw e;
145: } catch (NullPointerException e) {
146: TraceJms.logger.log(BasicLevel.ERROR, "exception: ", e);
147: throw e;
148: } catch (Exception e) {
149: if (i < maxloops) {
150: if (TraceJms.isDebug())
151: TraceJms.logger.log(BasicLevel.DEBUG,
152: "cannot reach the MOM - retrying...");
153: try {
154: // Thread.sleep() -> force current Thread to sleep
155: Thread.sleep(2000 * (i + 1));
156: } catch (InterruptedException e2) {
157: throw new JMSException("Cannot reach the MOM");
158: }
159: } else {
160: TraceJms.logger.log(BasicLevel.ERROR,
161: "cannot load admin class " + e);
162: throw e;
163: }
164: }
165: }
166: // We must register these factories in JNDI before any client
167: // try to get them.
168: getConnectionFactory();
169: getTopicConnectionFactory();
170: getQueueConnectionFactory();
171: }
172:
173: /**
174: * Creation of an administered Object Queue and bind it in the registry
175: */
176: public Queue createQueue(String name) throws Exception {
177: Queue queue = null;
178: // Don't recreate if already found in JNDI
179: try {
180: queue = (Queue) ictx.lookup(name);
181: if (TraceJms.isDebug())
182: TraceJms.logger.log(BasicLevel.DEBUG, "queue " + name
183: + " already found");
184: return queue;
185: } catch (NamingException ex) {
186: }
187: if (TraceJms.isDebug())
188: TraceJms.logger.log(BasicLevel.DEBUG,
189: "creating and registering queue " + name);
190: queue = momadmin.createQueue(name);
191: // rebind is already done in momadmin.createQueue()
192: //ictx.rebind(name, queue);
193: namelist.addElement(name);
194: queues.put(name, queue);
195: return queue;
196: }
197:
198: /**
199: * Creation of an administered Object Topic and bind it in the registry
200: */
201: public Topic createTopic(String name) throws Exception {
202: Topic topic = null;
203: // Don't recreate if already found in JNDI
204: try {
205: topic = (Topic) ictx.lookup(name);
206: if (TraceJms.isDebug())
207: TraceJms.logger.log(BasicLevel.DEBUG, "topic " + name
208: + " already found");
209: return topic;
210: } catch (NamingException ex) {
211: }
212: if (TraceJms.isDebug())
213: TraceJms.logger.log(BasicLevel.DEBUG,
214: "creating and registering topic " + name);
215: topic = momadmin.createTopic(name);
216: // rebind is already done in momadmin.createTopic()
217: //ictx.rebind(name, topic);
218: namelist.addElement(name);
219: topics.put(name, topic);
220: return topic;
221: }
222:
223: /**
224: * Get the unique ConnectionFactory
225: */
226: public ConnectionFactory getConnectionFactory() {
227: if (cf == null) {
228: String name = "CF";
229: cf = new org.objectweb.jonas_jms.JConnectionFactory(name);
230: try {
231: if (TraceJms.isDebug())
232: TraceJms.logger.log(BasicLevel.DEBUG,
233: "creating and registering " + name);
234: ictx.rebind(name, cf);
235: } catch (NamingException e) {
236: TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind "
237: + name + " :" + e);
238: }
239: }
240: return cf;
241: }
242:
243: /**
244: * Get the unique TopicConnectionFactory
245: */
246: public TopicConnectionFactory getTopicConnectionFactory() {
247: if (tcf == null) {
248: String name = "TCF";
249: tcf = new org.objectweb.jonas_jms.JTopicConnectionFactory(
250: name);
251: try {
252: if (TraceJms.isDebug())
253: TraceJms.logger.log(BasicLevel.DEBUG,
254: "creating and registering " + name);
255: ictx.rebind(name, tcf);
256: } catch (NamingException e) {
257: TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind "
258: + name + " :" + e);
259: }
260: }
261: return tcf;
262: }
263:
264: /*
265: * Get the unique QueueConnectionFactory
266: */
267: public QueueConnectionFactory getQueueConnectionFactory() {
268: if (qcf == null) {
269: String name = "QCF";
270: qcf = new org.objectweb.jonas_jms.JQueueConnectionFactory(
271: name);
272: try {
273: if (TraceJms.isDebug())
274: TraceJms.logger.log(BasicLevel.DEBUG,
275: "creating and registering " + name);
276: ictx.rebind(name, qcf);
277: } catch (NamingException e) {
278: TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind "
279: + name + " :" + e);
280: }
281: }
282: return qcf;
283: }
284:
285: /**
286: * Get Default XAConnectionFactory
287: */
288: public XAConnectionFactory getXAConnectionFactory() {
289: return momadmin.getXAConnectionFactory();
290: }
291:
292: /**
293: * Get Default XATopicConnectionFactory
294: */
295: public XATopicConnectionFactory getXATopicConnectionFactory() {
296: return momadmin.getXATopicConnectionFactory();
297: }
298:
299: /**
300: * Get Default XAQueueConnectionFactory
301: */
302: public XAQueueConnectionFactory getXAQueueConnectionFactory() {
303: return momadmin.getXAQueueConnectionFactory();
304: }
305:
306: /**
307: * Get Queue (creates it if not exist)
308: */
309: public Queue getQueue(String name) throws Exception {
310: Queue q = (Queue) queues.get(name);
311: if (q == null) {
312: q = createQueue(name);
313: }
314: return q;
315: }
316:
317: /**
318: * Get Topic (creates it if not exist)
319: */
320: public Topic getTopic(String name) throws Exception {
321: Topic t = (Topic) topics.get(name);
322: if (t == null) {
323: t = createTopic(name);
324: }
325: return t;
326: }
327:
328: /**
329: * Get Topic Names
330: */
331: public Enumeration getTopicsNames() {
332: return topics.keys();
333: }
334:
335: /**
336: * Get Queue Names
337: */
338: public Enumeration getQueuesNames() {
339: return queues.keys();
340: }
341:
342: /**
343: * Terminate the administering process
344: */
345: public void stop() throws Exception {
346: // Before stopping the MOM clean up the connection pools
347: if (cf != null) {
348: ((JConnectionFactory) cf).cleanPool();
349: }
350:
351: if (tcf != null) {
352: ((JConnectionFactory) tcf).cleanPool();
353: }
354:
355: if (qcf != null) {
356: ((JConnectionFactory) qcf).cleanPool();
357: }
358:
359: // Stop the MOM
360: if (momadmin != null) {
361: momadmin.stop();
362: }
363:
364: // clean up JNDI
365: Enumeration ln = namelist.elements();
366: while (ln.hasMoreElements()) {
367: String name = (String) ln.nextElement();
368: try {
369: ictx.unbind(name);
370: if (TraceJms.isDebug())
371: TraceJms.logger.log(BasicLevel.DEBUG, "unbind "
372: + name);
373: } catch (NamingException e) {
374: if (TraceJms.isDebug())
375: TraceJms.logger.log(BasicLevel.ERROR,
376: "cannot unbind " + name);
377: }
378: }
379:
380: }
381:
382: // -------------------------------------------------------------
383: // Management methods
384: // -------------------------------------------------------------
385:
386: /**
387: * @return the current number of Jms Connection Factory
388: */
389: public int getCurrentNumberOfJmsConnectionFactory() {
390: int result = 0;
391: if (cf != null)
392: result++;
393: return result;
394: }
395:
396: /**
397: * @return the current number of Topic Jms Connection Factory
398: */
399: public int getCurrentNumberOfJmsTopicConnectionFactory() {
400: int result = 0;
401: if (tcf != null)
402: result++;
403: return result;
404: }
405:
406: /**
407: * @return the current number of Queue Jms Connection Factory
408: */
409: public int getCurrentNumberOfJmsQueueConnectionFactory() {
410: int result = 0;
411: if (qcf != null)
412: result++;
413: return result;
414: }
415:
416: /**
417: * @return the current number of Topic Jms Destination
418: */
419: public int getCurrentNumberOfJmsTopicDestination() {
420: return topics.size();
421: }
422:
423: /**
424: * @return the current number of Queue Jms Destination
425: */
426: public int getCurrentNumberOfJmsQueueDestination() {
427: return queues.size();
428: }
429:
430: /**
431: * Remove a Jms destination
432: * @param String jndi name
433: * @return the destination type : "queue" or "topic"
434: */
435: public String removeJmsDestination(String jndiName)
436: throws Exception {
437:
438: momadmin.deleteDestination(jndiName);
439:
440: String destType = null;
441: if ((queues.containsKey(jndiName))
442: || (topics.containsKey(jndiName))) {
443: try {
444: InitialContext ictx = new InitialContext();
445: ictx.unbind(jndiName);
446: Object o = null; // object to which the jndiName is mapped
447: o = queues.remove(jndiName);
448: if (o == null) {
449: // jndiName did not have a mapping, try in the topics hashtable
450: o = topics.remove(jndiName);
451: if (o != null) {
452: destType = "topic";
453: } else {
454: // nither queue nor topic ???
455: destType = "unknown";
456: }
457: } else {
458: destType = "queue";
459: }
460: return destType;
461: } catch (Exception ex) {
462: throw new Exception(
463: "JmsManagerImpl remove destination : cannot unbind "
464: + jndiName);
465: }
466: } else {
467: throw new Exception("Unexisting jms destination :"
468: + jndiName + " remove abord");
469: }
470: }
471:
472: /**
473: * @return String name of default Connection factory
474: */
475: public String getDefaultConnectionFactoryName() {
476: return "CF";
477: }
478:
479: /**
480: * @return String name of default Queue Connection factory
481: */
482: public String getDefaultQueueConnectionFactoryName() {
483: return "QCF";
484: }
485:
486: /**
487: * @return String name of default Topic Connection factory
488: */
489: public String getDefaultTopicConnectionFactoryName() {
490: return "TCF";
491: }
492:
493: // Monitoring methods
494:
495: /**
496: * Get the messaging mode a connection factory belongs to
497: * @param jndiName connection factory name
498: * @return messaging mode
499: * @exception failure on calling monitoring operation
500: */
501: public String getConnectionFactoryMode(String jndiName)
502: throws Exception {
503: TraceJms.logger.log(BasicLevel.DEBUG, "");
504:
505: if (jndiName.equals("CF"))
506: return "Point-To-Point and Publish/Subscribe";
507: else if (jndiName.equals("QCF"))
508: return "Point-To-Point";
509: else if (jndiName.equals("TCF"))
510: return "Publish/Subscribe";
511: else
512: throw new IllegalStateException("Unknown factory.");
513: }
514:
515: /**
516: * Get number of pending messages on a queue
517: * @param name the queue's jndi name
518: * @return number of pending messages
519: * @exception failure on calling monitoring operation
520: */
521: public int getPendingMessages(String jndiName) throws Exception {
522: TraceJms.logger.log(BasicLevel.DEBUG, "");
523: Queue queue = (Queue) queues.get(jndiName);
524: int n;
525: if (queue != null) {
526: // existent queue (created by JOnAS)
527: n = momadmin.getPendingMessages(queue);
528: } else {
529: throw new IllegalStateException(jndiName
530: + " not a queue created by the jms service");
531: }
532: return n;
533: }
534:
535: /**
536: * Get number of pending requests on a queue
537: * @param jndiName queue name
538: * @return number of pending requests
539: * @exception failure on calling monitoring operation
540: */
541: public int getPendingRequests(String jndiName) throws Exception {
542: TraceJms.logger.log(BasicLevel.DEBUG, "");
543: Queue queue = (Queue) queues.get(jndiName);
544: int n;
545: if (queue != null) {
546: // existent queue (created by JOnAS)
547: n = momadmin.getPendingRequests(queue);
548: } else {
549: throw new IllegalStateException(jndiName
550: + " not a queue created by the jms service");
551: }
552: return n;
553: }
554:
555: /**
556: * Get number of subscriptions on a topic
557: * @param jndiName topic name
558: * @return number of subscriptions
559: * @exception failure on calling monitoring operation
560: */
561: public int getSubscriptions(String jndiName) throws Exception {
562: Topic topic = (Topic) topics.get(jndiName);
563: int n;
564: if (topic != null) {
565: // existent topic (created by JOnAS)
566: n = momadmin.getSubscriptions(topic);
567: } else {
568: throw new IllegalStateException(jndiName
569: + " not a topic created by the jms service");
570: }
571: return n;
572: }
573: }
|