001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.adapter.jms;
023:
024: import java.io.PrintWriter;
025: import java.util.Collections;
026: import java.util.HashSet;
027: import java.util.Iterator;
028: import java.util.Set;
029: import java.util.Vector;
030:
031: import javax.jms.Connection;
032: import javax.jms.ExceptionListener;
033: import javax.jms.JMSException;
034: import javax.jms.QueueConnection;
035: import javax.jms.QueueSession;
036: import javax.jms.Session;
037: import javax.jms.TopicConnection;
038: import javax.jms.TopicSession;
039: import javax.jms.XAConnection;
040: import javax.jms.XAQueueConnection;
041: import javax.jms.XAQueueSession;
042: import javax.jms.XASession;
043: import javax.jms.XATopicConnection;
044: import javax.jms.XATopicSession;
045: import javax.naming.Context;
046: import javax.naming.InitialContext;
047: import javax.naming.NamingException;
048: import javax.resource.NotSupportedException;
049: import javax.resource.ResourceException;
050: import javax.resource.spi.ConnectionEvent;
051: import javax.resource.spi.ConnectionEventListener;
052: import javax.resource.spi.ConnectionRequestInfo;
053: import javax.resource.spi.IllegalStateException;
054: import javax.resource.spi.LocalTransaction;
055: import javax.resource.spi.ManagedConnection;
056: import javax.resource.spi.ManagedConnectionMetaData;
057: import javax.resource.spi.SecurityException;
058: import javax.security.auth.Subject;
059: import javax.transaction.xa.XAResource;
060:
061: import org.jboss.jms.ConnectionFactoryHelper;
062: import org.jboss.jms.jndi.JMSProviderAdapter;
063: import org.jboss.logging.Logger;
064: import org.jboss.resource.JBossResourceException;
065:
066: /**
067: * Managed Connection, manages one or more JMS sessions.
068: *
069: * <p>Every ManagedConnection will have a physical JMSConnection under the
070: * hood. This may leave out several session, as specifyed in 5.5.4 Multiple
071: * Connection Handles. Thread safe semantics is provided
072: *
073: * <p>Hm. If we are to follow the example in 6.11 this will not work. We would
074: * have to use the SAME session. This means we will have to guard against
075: * concurrent access. We use a stack, and only allowes the handle at the
076: * top of the stack to do things.
077: *
078: * <p>As to transactions we some fairly hairy alternatives to handle:
079: * XA - we get an XA. We may now only do transaction through the
080: * XAResource, since a XASession MUST throw exceptions in commit etc. But
081: * since XA support implies LocatTransaction support, we will have to use
082: * the XAResource in the LocalTransaction class.
083: * LocalTx - we get a normal session. The LocalTransaction will then work
084: * against the normal session api.
085: *
086: * <p>An invokation of JMS MAY BE DONE in none transacted context. What do we
087: * do then? How much should we leave to the user???
088: *
089: * <p>One possible solution is to use transactions any way, but under the hood.
090: * If not LocalTransaction or XA has been aquired by the container, we have
091: * to do the commit in send and publish. (CHECK is the container required
092: * to get a XA every time it uses a managed connection? No its is not, only
093: * at creation!)
094: *
095: * <p>Does this mean that a session one time may be used in a transacted env,
096: * and another time in a not transacted.
097: *
098: * <p>Maybe we could have this simple rule:
099: *
100: * <p>If a user is going to use non trans:
101: * <ul>
102: * <li>mark that i ra deployment descr
103: * <li>Use a JmsProviderAdapter with non XA factorys
104: * <li>Mark session as non transacted (this defeats the purpose of specifying
105: * <li>trans attrinbutes in deploy descr NOT GOOD
106: * </ul>
107: *
108: * <p>From the JMS tutorial:
109: * "When you create a session in an enterprise bean, the container ignores
110: * the arguments you specify, because it manages all transactional
111: * properties for enterprise beans."
112: *
113: * <p>And further:
114: * "You do not specify a message acknowledgment mode when you create a
115: * message-driven bean that uses container-managed transactions. The
116: * container handles acknowledgment automatically."
117: *
118: * <p>On Session or Connection:
119: * <p>From Tutorial:
120: * "A JMS API resource is a JMS API connection or a JMS API session." But in
121: * the J2EE spec only connection is considered a resource.
122: *
123: * <p>Not resolved: connectionErrorOccurred: it is verry hard to know from the
124: * exceptions thrown if it is a connection error. Should we register an
125: * ExceptionListener and mark al handles as errounous? And then let them
126: * send the event and throw an exception?
127: *
128: * @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
129: * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
130: * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
131: * @version $Revision: 57189 $
132: */
133: public class JmsManagedConnection implements ManagedConnection,
134: ExceptionListener {
135: private static final Logger log = Logger
136: .getLogger(JmsManagedConnection.class);
137:
138: private JmsManagedConnectionFactory mcf;
139: private JmsConnectionRequestInfo info;
140: private String user;
141: private String pwd;
142: private boolean isDestroyed;
143:
144: // Physical JMS connection stuff
145: private Connection con;
146: private Session session;
147: private TopicSession topicSession;
148: private QueueSession queueSession;
149: private XASession xaSession;
150: private XATopicSession xaTopicSession;
151: private XAQueueSession xaQueueSession;
152: private XAResource xaResource;
153: private boolean xaTransacted;
154:
155: /** Holds all current JmsSession handles. */
156: private Set handles = Collections.synchronizedSet(new HashSet());
157:
158: /** The event listeners */
159: private Vector listeners = new Vector();
160:
161: /**
162: * Create a <tt>JmsManagedConnection</tt>.
163: *
164: * @param mcf
165: * @param info
166: * @param user
167: * @param pwd
168: *
169: * @throws ResourceException
170: */
171: public JmsManagedConnection(final JmsManagedConnectionFactory mcf,
172: final ConnectionRequestInfo info, final String user,
173: final String pwd) throws ResourceException {
174: this .mcf = mcf;
175:
176: // seem like its asking for trouble here
177: this .info = (JmsConnectionRequestInfo) info;
178: this .user = user;
179: this .pwd = pwd;
180:
181: setup();
182: }
183:
184: //---- ManagedConnection API ----
185:
186: /**
187: * Get the physical connection handler.
188: *
189: * <p>This bummer will be called in two situations:
190: * <ol>
191: * <li>When a new mc has bean created and a connection is needed
192: * <li>When an mc has been fetched from the pool (returned in match*)
193: * </ol>
194: *
195: * <p>It may also be called multiple time without a cleanup, to support
196: * connection sharing.
197: *
198: * @param subject
199: * @param info
200: * @return A new connection object.
201: *
202: * @throws ResourceException
203: */
204: public Object getConnection(final Subject subject,
205: final ConnectionRequestInfo info) throws ResourceException {
206: // Check user first
207: JmsCred cred = JmsCred.getJmsCred(mcf, subject, info);
208:
209: // Null users are allowed!
210: if (user != null && !user.equals(cred.name))
211: throw new SecurityException(
212: "Password credentials not the same, reauthentication not allowed");
213: if (cred.name != null && user == null) {
214: throw new SecurityException(
215: "Password credentials not the same, reauthentication not allowed");
216: }
217:
218: user = cred.name; // Basically meaningless
219:
220: if (isDestroyed)
221: throw new IllegalStateException(
222: "ManagedConnection already destroyd");
223:
224: // Create a handle
225: JmsSession handle = new JmsSession(this ,
226: (JmsConnectionRequestInfo) info);
227: handles.add(handle);
228: return handle;
229: }
230:
231: /**
232: * Destroy all handles.
233: *
234: * @throws ResourceException Failed to close one or more handles.
235: */
236: private void destroyHandles() throws ResourceException {
237: try {
238: if (con != null)
239: con.stop();
240: } catch (Throwable t) {
241: log.trace("Ignored error stopping connection", t);
242: }
243:
244: Iterator iter = handles.iterator();
245: while (iter.hasNext())
246: ((JmsSession) iter.next()).destroy();
247:
248: // clear the handles map
249: handles.clear();
250: }
251:
252: /**
253: * Destroy the physical connection.
254: *
255: * @throws ResourceException Could not property close the session and
256: * connection.
257: */
258: public void destroy() throws ResourceException {
259: if (isDestroyed)
260: return;
261:
262: isDestroyed = true;
263:
264: try {
265: con.setExceptionListener(null);
266: } catch (JMSException e) {
267: log.debug("Error unsetting the exception listener " + this ,
268: e);
269: }
270:
271: // destory handles
272: destroyHandles();
273:
274: try {
275: // Close session and connection
276: try {
277: if (info.getType() == JmsConnectionFactory.TOPIC) {
278: topicSession.close();
279: if (xaTransacted) {
280: xaTopicSession.close();
281: }
282: } else if (info.getType() == JmsConnectionFactory.QUEUE) {
283: queueSession.close();
284: if (xaTransacted)
285: xaQueueSession.close();
286: } else {
287: session.close();
288: if (xaTransacted)
289: xaSession.close();
290: }
291: } catch (JMSException e) {
292: log.debug("Error closing session " + this , e);
293: }
294: con.close();
295: } catch (JMSException e) {
296: throw new JBossResourceException(
297: "Could not properly close the session and connection",
298: e);
299: }
300: }
301:
302: /**
303: * Cleans up the, from the spec
304: * - The cleanup of ManagedConnection instance resets its client specific
305: * state.
306: *
307: * Does that mean that autentication should be redone. FIXME
308: */
309: public void cleanup() throws ResourceException {
310: if (isDestroyed)
311: throw new IllegalStateException(
312: "ManagedConnection already destroyed");
313:
314: // destory handles
315: destroyHandles();
316: }
317:
318: /**
319: * Move a handler from one mc to this one.
320: *
321: * @param obj An object of type JmsSession.
322: *
323: * @throws ResourceException Failed to associate connection.
324: * @throws IllegalStateException ManagedConnection in an illegal state.
325: */
326: public void associateConnection(final Object obj)
327: throws ResourceException {
328: //
329: // Should we check auth, ie user and pwd? FIXME
330: //
331:
332: if (!isDestroyed && obj instanceof JmsSession) {
333: JmsSession h = (JmsSession) obj;
334: h.setManagedConnection(this );
335: handles.add(h);
336: } else
337: throw new IllegalStateException(
338: "ManagedConnection in an illegal state");
339: }
340:
341: /**
342: * Add a connection event listener.
343: *
344: * @param l The connection event listener to be added.
345: */
346: public void addConnectionEventListener(
347: final ConnectionEventListener l) {
348: listeners.addElement(l);
349:
350: if (log.isTraceEnabled())
351: log.trace("ConnectionEvent listener added: " + l);
352: }
353:
354: /**
355: * Remove a connection event listener.
356: *
357: * @param l The connection event listener to be removed.
358: */
359: public void removeConnectionEventListener(
360: final ConnectionEventListener l) {
361: listeners.removeElement(l);
362: }
363:
364: /**
365: * Get the XAResource for the connection.
366: *
367: * @return The XAResource for the connection.
368: *
369: * @throws ResourceException XA transaction not supported
370: */
371: public XAResource getXAResource() throws ResourceException {
372: //
373: // Spec says a mc must allways return the same XA resource,
374: // so we cache it.
375: //
376: if (!xaTransacted)
377: throw new NotSupportedException(
378: "Non XA transaction not supported");
379:
380: if (xaResource == null) {
381: if (info.getType() == JmsConnectionFactory.TOPIC)
382: xaResource = xaTopicSession.getXAResource();
383: else if (info.getType() == JmsConnectionFactory.QUEUE)
384: xaResource = xaQueueSession.getXAResource();
385: else
386: xaResource = xaSession.getXAResource();
387: }
388:
389: if (log.isTraceEnabled())
390: log.trace("XAResource=" + xaResource);
391:
392: return xaResource;
393: }
394:
395: /**
396: * Get the location transaction for the connection.
397: *
398: * @return The local transaction for the connection.
399: *
400: * @throws ResourceException
401: */
402: public LocalTransaction getLocalTransaction()
403: throws ResourceException {
404: LocalTransaction tx = new JmsLocalTransaction(this );
405: if (log.isTraceEnabled())
406: log.trace("LocalTransaction=" + tx);
407: return tx;
408: }
409:
410: /**
411: * Get the meta data for the connection.
412: *
413: * @return The meta data for the connection.
414: *
415: * @throws ResourceException
416: * @throws IllegalStateException ManagedConnection already destroyed.
417: */
418: public ManagedConnectionMetaData getMetaData()
419: throws ResourceException {
420: if (isDestroyed)
421: throw new IllegalStateException(
422: "ManagedConnection already destroyd");
423:
424: return new JmsMetaData(this );
425: }
426:
427: /**
428: * Set the log writer for this connection.
429: *
430: * @param out The log writer for this connection.
431: *
432: * @throws ResourceException
433: */
434: public void setLogWriter(final PrintWriter out)
435: throws ResourceException {
436: //
437: // jason: screw the logWriter stuff for now it sucks ass
438: //
439: }
440:
441: /**
442: * Get the log writer for this connection.
443: *
444: * @return Always null
445: */
446: public PrintWriter getLogWriter() throws ResourceException {
447: //
448: // jason: screw the logWriter stuff for now it sucks ass
449: //
450:
451: return null;
452: }
453:
454: // --- Exception listener implementation
455:
456: public void onException(JMSException exception) {
457: if (isDestroyed) {
458: if (log.isTraceEnabled())
459: log.trace(
460: "Ignoring error on already destroyed connection "
461: + this , exception);
462: return;
463: }
464:
465: log.warn("Handling jms exception failure: " + this , exception);
466:
467: try {
468: con.setExceptionListener(null);
469: } catch (JMSException e) {
470: log.debug("Unable to unset exception listener", e);
471: }
472:
473: ConnectionEvent event = new ConnectionEvent(this ,
474: ConnectionEvent.CONNECTION_ERROR_OCCURRED, exception);
475: sendEvent(event);
476: }
477:
478: // --- Api to JmsSession
479:
480: /**
481: * Get the session for this connection.
482: *
483: * @return Either a topic or queue connection.
484: */
485: protected Session getSession() {
486: if (info.getType() == JmsConnectionFactory.TOPIC)
487: return topicSession;
488: else if (info.getType() == JmsConnectionFactory.QUEUE)
489: return queueSession;
490: else
491: return session;
492: }
493:
494: /**
495: * Send an event.
496: *
497: * @param event The event to send.
498: */
499: protected void sendEvent(final ConnectionEvent event) {
500: int type = event.getId();
501:
502: if (log.isTraceEnabled())
503: log.trace("Sending connection event: " + type);
504:
505: // convert to an array to avoid concurrent modification exceptions
506: ConnectionEventListener[] list = (ConnectionEventListener[]) listeners
507: .toArray(new ConnectionEventListener[listeners.size()]);
508:
509: for (int i = 0; i < list.length; i++) {
510: switch (type) {
511: case ConnectionEvent.CONNECTION_CLOSED:
512: list[i].connectionClosed(event);
513: break;
514:
515: case ConnectionEvent.LOCAL_TRANSACTION_STARTED:
516: list[i].localTransactionStarted(event);
517: break;
518:
519: case ConnectionEvent.LOCAL_TRANSACTION_COMMITTED:
520: list[i].localTransactionCommitted(event);
521: break;
522:
523: case ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK:
524: list[i].localTransactionRolledback(event);
525: break;
526:
527: case ConnectionEvent.CONNECTION_ERROR_OCCURRED:
528: list[i].connectionErrorOccurred(event);
529: break;
530:
531: default:
532: throw new IllegalArgumentException(
533: "Illegal eventType: " + type);
534: }
535: }
536: }
537:
538: /**
539: * Remove a handle from the handle map.
540: *
541: * @param handle The handle to remove.
542: */
543: protected void removeHandle(final JmsSession handle) {
544: handles.remove(handle);
545: }
546:
547: // --- Used by MCF
548:
549: /**
550: * Get the request info for this connection.
551: *
552: * @return The request info for this connection.
553: */
554: protected ConnectionRequestInfo getInfo() {
555: return info;
556: }
557:
558: /**
559: * Get the connection factory for this connection.
560: *
561: * @return The connection factory for this connection.
562: */
563: protected JmsManagedConnectionFactory getManagedConnectionFactory() {
564: return mcf;
565: }
566:
567: void start() throws JMSException {
568: con.start();
569: }
570:
571: void stop() throws JMSException {
572: con.stop();
573: }
574:
575: // --- Used by MetaData
576:
577: /**
578: * Get the user name for this connection.
579: *
580: * @return The user name for this connection.
581: */
582: protected String getUserName() {
583: return user;
584: }
585:
586: // --- Private helper methods
587:
588: /**
589: * Get the JMS provider adapter that will be used to create JMS
590: * resources.
591: *
592: * @return A JMS provider adapter.
593: *
594: * @throws NamingException Failed to lookup provider adapter.
595: */
596: private JMSProviderAdapter getProviderAdapter()
597: throws NamingException {
598: JMSProviderAdapter adapter;
599:
600: if (mcf.getJmsProviderAdapterJNDI() != null) {
601: // lookup the adapter from JNDI
602: Context ctx = new InitialContext();
603: try {
604: adapter = (JMSProviderAdapter) ctx.lookup(mcf
605: .getJmsProviderAdapterJNDI());
606: } finally {
607: ctx.close();
608: }
609: } else
610: adapter = mcf.getJmsProviderAdapter();
611:
612: return adapter;
613: }
614:
615: /**
616: * Setup the connection.
617: *
618: * @throws ResourceException
619: */
620: private void setup() throws ResourceException {
621: boolean trace = log.isTraceEnabled();
622:
623: try {
624: JMSProviderAdapter adapter = getProviderAdapter();
625: Context context = adapter.getInitialContext();
626: Object factory;
627: boolean transacted = info.isTransacted();
628: int ack = Session.AUTO_ACKNOWLEDGE;
629:
630: if (info.getType() == JmsConnectionFactory.TOPIC) {
631: String jndi = adapter.getTopicFactoryRef();
632: if (jndi == null)
633: throw new IllegalStateException(
634: "No configured 'TopicFactoryRef' on the jms provider "
635: + mcf.getJmsProviderAdapterJNDI());
636: factory = context.lookup(jndi);
637: con = ConnectionFactoryHelper.createTopicConnection(
638: factory, user, pwd);
639: if (info.getClientID() != null)
640: con.setClientID(info.getClientID());
641: con.setExceptionListener(this );
642: if (trace)
643: log.trace("created connection: " + con);
644:
645: if (con instanceof XATopicConnection) {
646: xaTopicSession = ((XATopicConnection) con)
647: .createXATopicSession();
648: topicSession = xaTopicSession.getTopicSession();
649: xaTransacted = true;
650: } else if (con instanceof TopicConnection) {
651: topicSession = ((TopicConnection) con)
652: .createTopicSession(transacted, ack);
653: if (trace)
654: log
655: .trace("Using a non-XA TopicConnection. "
656: + "It will not be able to participate in a Global UOW");
657: } else
658: throw new JBossResourceException(
659: "Connection was not recognizable: " + con);
660:
661: if (trace)
662: log.trace("xaTopicSession=" + xaTopicSession
663: + ", topicSession=" + topicSession);
664: } else if (info.getType() == JmsConnectionFactory.QUEUE) {
665: String jndi = adapter.getQueueFactoryRef();
666: if (jndi == null)
667: throw new IllegalStateException(
668: "No configured 'QueueFactoryRef' on the jms provider "
669: + mcf.getJmsProviderAdapterJNDI());
670: factory = context.lookup(jndi);
671: con = ConnectionFactoryHelper.createQueueConnection(
672: factory, user, pwd);
673: if (info.getClientID() != null)
674: con.setClientID(info.getClientID());
675: con.setExceptionListener(this );
676: if (trace)
677: log.debug("created connection: " + con);
678:
679: if (con instanceof XAQueueConnection) {
680: xaQueueSession = ((XAQueueConnection) con)
681: .createXAQueueSession();
682: queueSession = xaQueueSession.getQueueSession();
683: xaTransacted = true;
684: } else if (con instanceof QueueConnection) {
685: queueSession = ((QueueConnection) con)
686: .createQueueSession(transacted, ack);
687: if (trace)
688: log
689: .trace("Using a non-XA QueueConnection. "
690: + "It will not be able to participate in a Global UOW");
691: } else
692: throw new JBossResourceException(
693: "Connection was not reconizable: " + con);
694:
695: if (trace)
696: log.trace("xaQueueSession=" + xaQueueSession
697: + ", queueSession=" + queueSession);
698: } else {
699: String jndi = adapter.getFactoryRef();
700: if (jndi == null)
701: throw new IllegalStateException(
702: "No configured 'FactoryRef' on the jms provider "
703: + mcf.getJmsProviderAdapterJNDI());
704: factory = context.lookup(jndi);
705: con = ConnectionFactoryHelper.createConnection(factory,
706: user, pwd);
707: if (info.getClientID() != null)
708: con.setClientID(info.getClientID());
709: con.setExceptionListener(this );
710: if (trace)
711: log.trace("created connection: " + con);
712:
713: if (con instanceof XAConnection) {
714: xaSession = ((XAConnection) con).createXASession();
715: session = xaSession.getSession();
716: xaTransacted = true;
717: } else {
718: session = con.createSession(transacted, ack);
719: if (trace)
720: log
721: .trace("Using a non-XA Connection. "
722: + "It will not be able to participate in a Global UOW");
723: }
724:
725: if (trace)
726: log.debug("xaSession=" + xaQueueSession
727: + ", Session=" + session);
728: }
729:
730: if (trace)
731: log.debug("transacted=" + transacted + ", ack=" + ack);
732: } catch (NamingException e) {
733: throw new JBossResourceException(
734: "Unable to setup connection", e);
735: } catch (JMSException e) {
736: throw new JBossResourceException(
737: "Unable to setup connection", e);
738: }
739: }
740: }
|