001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.jms.asf;
023:
024: import javax.jms.JMSException;
025: import javax.jms.Message;
026: import javax.jms.MessageListener;
027: import javax.jms.ServerSession;
028: import javax.jms.Session;
029: import javax.jms.XASession;
030: import javax.naming.InitialContext;
031: import javax.transaction.Status;
032: import javax.transaction.Transaction;
033: import javax.transaction.TransactionManager;
034: import javax.transaction.xa.XAResource;
035: import javax.transaction.xa.Xid;
036: import org.jboss.logging.Logger;
037: import org.jboss.tm.TransactionManagerService;
038: import org.jboss.tm.XidFactoryMBean;
039:
040: /**
041: * An implementation of ServerSession. <p>
042: *
043: * @author <a href="mailto:peter.antman@tim.se">Peter Antman</a> .
044: * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
045: * @author <a href="mailto:hiram.chirino@jboss.org">Hiram Chirino</a> .
046: * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
047: * @version $Revision: 57209 $
048: */
049: public class StdServerSession implements Runnable, ServerSession,
050: MessageListener {
051: /** Instance logger. */
052: static Logger log = Logger.getLogger(StdServerSession.class);
053:
054: /** The server session pool which we belong to. */
055: private StdServerSessionPool serverSessionPool;
056:
057: /** Our session resource. */
058: private Session session;
059:
060: /** Our XA session resource. */
061: private XASession xaSession;
062:
063: /** The transaction manager that we will use for transactions. */
064: private TransactionManager tm;
065:
066: /**
067: * Use the session's XAResource directly if we have an JBossMQ XASession.
068: * this allows us to get around the TX timeout problem when you have
069: * extensive message processing.
070: */
071: private boolean useLocalTX;
072:
073: /** The listener to delegate calls, to. In our case the container invoker. */
074: private MessageListener delegateListener;
075:
076: private XidFactoryMBean xidFactory;
077:
078: /**
079: * @deprecated
080: * @todo these appeared in jboss-head where are they used?
081: */
082: public TransactionManager getTransactionManager() {
083: return tm;
084: }
085:
086: /**
087: * @deprecated
088: * @todo these appeared in jboss-head where are they used?
089: */
090: public void setTransactionManager(
091: TransactionManager transactionManager) {
092: this .tm = transactionManager;
093: }
094:
095: /**
096: * Create a <tt>StdServerSession</tt> .
097: *
098: * @param pool The server session pool which we belong to.
099: * @param session Our session resource.
100: * @param xaSession Our XA session resource.
101: * @param delegateListener Listener to call when messages arrives.
102: * @param useLocalTX Will this session be used in a global TX (we can optimize with 1 phase commit)
103: * @throws JMSException Transation manager was not found.
104: */
105: StdServerSession(final StdServerSessionPool pool,
106: final Session session, final XASession xaSession,
107: final MessageListener delegateListener, boolean useLocalTX,
108: final XidFactoryMBean xidFactory,
109: final TransactionManager tm) throws JMSException {
110: this .serverSessionPool = pool;
111: this .session = session;
112: this .xaSession = xaSession;
113: this .delegateListener = delegateListener;
114: if (xaSession == null)
115: useLocalTX = false;
116: this .useLocalTX = useLocalTX;
117: this .xidFactory = xidFactory;
118: this .tm = tm;
119:
120: log
121: .debug("initializing (pool, session, xaSession, useLocalTX): "
122: + pool
123: + ", "
124: + session
125: + ", "
126: + xaSession
127: + ", " + useLocalTX);
128:
129: // Set out self as message listener
130: if (xaSession != null)
131: xaSession.setMessageListener(this );
132: else
133: session.setMessageListener(this );
134:
135: if (tm == null) {
136: InitialContext ctx = null;
137: try {
138: ctx = new InitialContext();
139: this .tm = (TransactionManager) ctx
140: .lookup(TransactionManagerService.JNDI_NAME);
141: } catch (Exception e) {
142: throw new JMSException(
143: "Transation manager was not found");
144: } finally {
145: if (ctx != null) {
146: try {
147: ctx.close();
148: } catch (Exception ignore) {
149: }
150: }
151: }
152: }
153: }
154:
155: /**
156: * Returns the session. <p>
157: * <p/>
158: * This simply returns what it has fetched from the connection. It is up to
159: * the jms provider to typecast it and have a private API to stuff messages
160: * into it.
161: *
162: * @return The session.
163: * @throws JMSException Description of Exception
164: */
165: public Session getSession() throws JMSException {
166: if (xaSession != null)
167: return xaSession;
168: else
169: return session;
170: }
171:
172: /**
173: * Runs in an own thread, basically calls the session.run(), it is up to the
174: * session to have been filled with messages and it will run against the
175: * listener set in StdServerSessionPool. When it has send all its messages it
176: * returns.
177: */
178: public void run() {
179: boolean trace = log.isTraceEnabled();
180: if (trace)
181: log.trace("running...");
182: try {
183: if (xaSession != null)
184: xaSession.run();
185: else
186: session.run();
187: } finally {
188: if (trace)
189: log.trace("recycling...");
190:
191: recycle();
192:
193: if (trace)
194: log.trace("finished run");
195: }
196: }
197:
198: /**
199: * Will get called from session for each message stuffed into it.
200: * <p/>
201: * Starts a transaction with the TransactionManager
202: * and enlists the XAResource of the JMS XASession if a XASession was
203: * available. A good JMS implementation should provide the XASession for use
204: * in the ASF. So we optimize for the case where we have an XASession. So,
205: * for the case where we do not have an XASession and the bean is not
206: * transacted, we have the unneeded overhead of creating a Transaction. I'm
207: * leaving it this way since it keeps the code simpler and that case should
208: * not be too common (JBossMQ provides XASessions).
209: */
210: public void onMessage(Message msg) {
211: boolean trace = log.isTraceEnabled();
212: if (trace)
213: log
214: .trace("onMessage running (pool, session, xaSession, useLocalTX): "
215: + ", "
216: + session
217: + ", "
218: + xaSession
219: + ", "
220: + useLocalTX);
221:
222: // Used if run with useLocalTX if true
223: Xid localXid = null;
224: boolean localRollbackFlag = false;
225: // Used if run with useLocalTX if false
226: Transaction trans = null;
227: try {
228:
229: if (useLocalTX) {
230: // Use JBossMQ One Phase Commit to commit the TX
231: localXid = xidFactory.newXid();//new XidImpl();
232: XAResource res = xaSession.getXAResource();
233: res.start(localXid, XAResource.TMNOFLAGS);
234:
235: if (trace)
236: log
237: .trace("Using optimized 1p commit to control TX.");
238: } else {
239:
240: // Use the TM to control the TX
241: tm.begin();
242: trans = tm.getTransaction();
243:
244: if (xaSession != null) {
245: XAResource res = xaSession.getXAResource();
246: if (!trans.enlistResource(res)) {
247: throw new JMSException(
248: "could not enlist resource");
249: }
250: if (trace)
251: log.trace("XAResource '" + res + "' enlisted.");
252: }
253: }
254: // Call delegate listener
255: delegateListener.onMessage(msg);
256: } catch (Exception e) {
257: log
258: .error(
259: "session failed to run; setting rollback only",
260: e);
261:
262: if (useLocalTX) {
263: // Use JBossMQ One Phase Commit to commit the TX
264: localRollbackFlag = true;
265: } else {
266: // Mark for tollback TX via TM
267: try {
268: // The transaction will be rolledback in the finally
269: if (trace)
270: log.trace("Using TM to mark TX for rollback.");
271: trans.setRollbackOnly();
272: } catch (Exception x) {
273: log.error("failed to set rollback only", x);
274: }
275: }
276:
277: } finally {
278: try {
279: if (useLocalTX) {
280: if (localRollbackFlag == true) {
281: if (trace)
282: log
283: .trace("Using optimized 1p commit to rollback TX.");
284:
285: XAResource res = xaSession.getXAResource();
286: res.end(localXid, XAResource.TMSUCCESS);
287: res.rollback(localXid);
288:
289: } else {
290: if (trace)
291: log
292: .trace("Using optimized 1p commit to commit TX.");
293:
294: XAResource res = xaSession.getXAResource();
295: res.end(localXid, XAResource.TMSUCCESS);
296: res.commit(localXid, true);
297: }
298: } else {
299: // Use the TM to commit the Tx (assert the correct association)
300: Transaction currentTx = tm.getTransaction();
301: if (trans.equals(currentTx) == false)
302: throw new IllegalStateException(
303: "Wrong tx association: expected "
304: + trans + " was " + currentTx);
305:
306: // Marked rollback
307: if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
308: if (trace)
309: log.trace("Rolling back JMS transaction");
310: // actually roll it back
311: tm.rollback();
312:
313: // NO XASession? then manually rollback.
314: // This is not so good but
315: // it's the best we can do if we have no XASession.
316: if (xaSession == null
317: && serverSessionPool.isTransacted()) {
318: session.rollback();
319: }
320: } else if (trans.getStatus() == Status.STATUS_ACTIVE) {
321: // Commit tx
322: // This will happen if
323: // a) everything goes well
324: // b) app. exception was thrown
325: if (trace)
326: log.trace("Commiting the JMS transaction");
327: tm.commit();
328:
329: // NO XASession? then manually commit. This is not so good but
330: // it's the best we can do if we have no XASession.
331: if (xaSession == null
332: && serverSessionPool.isTransacted()) {
333: session.commit();
334: }
335: } else {
336: if (trace)
337: log.trace(StdServerSession.this
338: + "transaction already ended");
339:
340: tm.suspend();
341:
342: if (xaSession == null
343: && serverSessionPool.isTransacted()) {
344: session.rollback();
345: }
346:
347: }
348: }
349: } catch (Exception e) {
350: log.error("failed to commit/rollback", e);
351: }
352: }
353: if (trace)
354: log.trace("onMessage done");
355: }
356:
357: /**
358: * Start the session and begin consuming messages.
359: *
360: * @throws JMSException No listener has been specified.
361: */
362: public void start() throws JMSException {
363: log.trace("starting invokes on server session");
364:
365: if (session != null) {
366: try {
367: serverSessionPool.getExecutor().execute(this );
368: } catch (InterruptedException ignore) {
369: }
370: } else {
371: throw new JMSException("No listener has been specified");
372: }
373: }
374:
375: /**
376: * Called by the ServerSessionPool when the sessions should be closed.
377: */
378: void close() {
379: if (session != null) {
380: try {
381: session.close();
382: } catch (Exception ignore) {
383: }
384:
385: session = null;
386: }
387:
388: if (xaSession != null) {
389: try {
390: xaSession.close();
391: } catch (Exception ignore) {
392: }
393: xaSession = null;
394: }
395:
396: log.debug("closed");
397: }
398:
399: /**
400: * This method is called by the ServerSessionPool when it is ready to be
401: * recycled intot the pool
402: */
403: void recycle() {
404: serverSessionPool.recycle(this);
405: }
406: }
|