001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.adapter.jms.inflow;
023:
024: import javax.jms.Connection;
025: import javax.jms.JMSException;
026: import javax.jms.Message;
027: import javax.jms.MessageListener;
028: import javax.jms.ServerSession;
029: import javax.jms.Session;
030: import javax.jms.XAConnection;
031: import javax.jms.XASession;
032: import javax.resource.spi.endpoint.MessageEndpoint;
033: import javax.resource.spi.endpoint.MessageEndpointFactory;
034: import javax.resource.spi.work.Work;
035: import javax.resource.spi.work.WorkEvent;
036: import javax.resource.spi.work.WorkException;
037: import javax.resource.spi.work.WorkListener;
038: import javax.resource.spi.work.WorkManager;
039: import javax.transaction.Status;
040: import javax.transaction.Transaction;
041: import javax.transaction.TransactionManager;
042: import javax.transaction.xa.XAResource;
043:
044: import org.jboss.logging.Logger;
045: import org.jboss.resource.connectionmanager.xa.JcaXAResourceWrapperFactory;
046:
047: /**
048: * A generic jms session pool.
049: *
050: * @author <a href="adrian@jboss.com">Adrian Brock</a>
051: * @author <a href="mailto:weston.price@jboss.com>Weston Price</a>
052: * @version $Revision: 60926 $
053: */
054: public class JmsServerSession implements ServerSession,
055: MessageListener, Work, WorkListener {
056: /** The log */
057: private static final Logger log = Logger
058: .getLogger(JmsServerSession.class);
059:
060: /** The session pool */
061: JmsServerSessionPool pool;
062:
063: /** The transacted flag */
064: boolean transacted;
065:
066: /** The acknowledge mode */
067: int acknowledge;
068:
069: /** The session */
070: Session session;
071:
072: /** Any XA session */
073: XASession xaSession;
074:
075: /** The endpoint */
076: MessageEndpoint endpoint;
077:
078: /** Any DLQ handler */
079: DLQHandler dlqHandler;
080:
081: TransactionDemarcationStrategy txnStrategy;
082:
083: /**
084: * Create a new JmsServerSession
085: *
086: * @param pool the server session pool
087: */
088: public JmsServerSession(JmsServerSessionPool pool) {
089: this .pool = pool;
090:
091: }
092:
093: /**
094: * Setup the session
095: */
096: public void setup() throws Exception {
097: JmsActivation activation = pool.getActivation();
098: JmsActivationSpec spec = activation.getActivationSpec();
099:
100: dlqHandler = activation.getDLQHandler();
101:
102: Connection connection = activation.getConnection();
103:
104: // Create the session
105: if (connection instanceof XAConnection
106: && activation.isDeliveryTransacted()) {
107: xaSession = ((XAConnection) connection).createXASession();
108: session = xaSession.getSession();
109: } else {
110: transacted = spec.isSessionTransacted();
111: acknowledge = spec.getAcknowledgeModeInt();
112: session = connection.createSession(transacted, acknowledge);
113: }
114:
115: // Get the endpoint
116: MessageEndpointFactory endpointFactory = activation
117: .getMessageEndpointFactory();
118: XAResource xaResource = null;
119:
120: if (activation.isDeliveryTransacted() && xaSession != null)
121: xaResource = xaSession.getXAResource();
122:
123: endpoint = endpointFactory.createEndpoint(xaResource);
124:
125: // Set the message listener
126: session.setMessageListener(this );
127: }
128:
129: /**
130: * Stop the session
131: */
132: public void teardown() {
133: try {
134: if (endpoint != null)
135: endpoint.release();
136: } catch (Throwable t) {
137: log.debug("Error releasing endpoint " + endpoint, t);
138: }
139:
140: try {
141: if (xaSession != null)
142: xaSession.close();
143: } catch (Throwable t) {
144: log.debug("Error releasing xaSession " + xaSession, t);
145: }
146:
147: try {
148: if (session != null)
149: session.close();
150: } catch (Throwable t) {
151: log.debug("Error releasing session " + session, t);
152: }
153: }
154:
155: public void onMessage(Message message) {
156: try {
157: endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
158:
159: try {
160: if (dlqHandler == null
161: || dlqHandler.handleRedeliveredMessage(message) == false) {
162: MessageListener listener = (MessageListener) endpoint;
163: listener.onMessage(message);
164: }
165: } finally {
166: endpoint.afterDelivery();
167:
168: if (dlqHandler != null)
169: dlqHandler.messageDelivered(message);
170: }
171: }
172:
173: catch (Throwable t) {
174: log.error("Unexpected error delivering message " + message,
175: t);
176:
177: if (txnStrategy != null)
178: txnStrategy.error();
179:
180: }
181:
182: }
183:
184: public Session getSession() throws JMSException {
185: return session;
186: }
187:
188: public void start() throws JMSException {
189: JmsActivation activation = pool.getActivation();
190: WorkManager workManager = activation.getWorkManager();
191: try {
192: workManager.scheduleWork(this , 0, null, this );
193: } catch (WorkException e) {
194: log.error("Unable to schedule work", e);
195: throw new JMSException("Unable to schedule work: "
196: + e.toString());
197: }
198: }
199:
200: public void run() {
201:
202: try {
203: txnStrategy = createTransactionDemarcation();
204:
205: } catch (Throwable t) {
206: log
207: .error("Error creating transaction demarcation. Cannot continue.");
208: return;
209: }
210:
211: try {
212: session.run();
213: } catch (Throwable t) {
214: if (txnStrategy != null)
215: txnStrategy.error();
216:
217: } finally {
218: if (txnStrategy != null)
219: txnStrategy.end();
220:
221: txnStrategy = null;
222: }
223:
224: }
225:
226: private TransactionDemarcationStrategy createTransactionDemarcation() {
227: return new DemarcationStrategyFactory().getStrategy();
228:
229: }
230:
231: public void release() {
232: }
233:
234: public void workAccepted(WorkEvent e) {
235: }
236:
237: public void workCompleted(WorkEvent e) {
238: pool.returnServerSession(this );
239: }
240:
241: public void workRejected(WorkEvent e) {
242: pool.returnServerSession(this );
243: }
244:
245: public void workStarted(WorkEvent e) {
246: }
247:
248: private class DemarcationStrategyFactory {
249:
250: TransactionDemarcationStrategy getStrategy() {
251: TransactionDemarcationStrategy current = null;
252: final JmsActivationSpec spec = pool.getActivation()
253: .getActivationSpec();
254: final JmsActivation activation = pool.getActivation();
255:
256: if (activation.isDeliveryTransacted() && xaSession != null) {
257: try {
258: current = new XATransactionDemarcationStrategy();
259: } catch (Throwable t) {
260: log
261: .error(
262: this
263: + " error creating transaction demarcation ",
264: t);
265: }
266:
267: } else {
268:
269: return new LocalDemarcationStrategy();
270:
271: }
272:
273: return current;
274: }
275:
276: }
277:
278: private interface TransactionDemarcationStrategy {
279: void error();
280:
281: void end();
282:
283: }
284:
285: private class LocalDemarcationStrategy implements
286: TransactionDemarcationStrategy {
287: public void end() {
288: final JmsActivationSpec spec = pool.getActivation()
289: .getActivationSpec();
290:
291: if (spec.isSessionTransacted()) {
292: if (session != null) {
293: try {
294: session.commit();
295: } catch (JMSException e) {
296: log.error(
297: "Failed to commit session transaction",
298: e);
299: }
300: }
301: }
302: }
303:
304: public void error() {
305: final JmsActivationSpec spec = pool.getActivation()
306: .getActivationSpec();
307:
308: if (spec.isSessionTransacted()) {
309: if (session != null)
310:
311: try {
312: /*
313: * Looks strange, but this basically means
314: *
315: * If the underlying connection was non-XA and the transaction attribute is REQUIRED
316: * we rollback. Also, if the underlying connection was non-XA and the transaction
317: * attribute is NOT_SUPPORT and the non standard redelivery behavior is enabled
318: * we rollback to force redelivery.
319: *
320: */
321: if (pool.getActivation().isDeliveryTransacted()
322: || spec.getRedeliverUnspecified()) {
323: session.rollback();
324: }
325:
326: } catch (JMSException e) {
327: log
328: .error(
329: "Failed to rollback session transaction",
330: e);
331: }
332:
333: }
334: }
335:
336: }
337:
338: private class XATransactionDemarcationStrategy implements
339: TransactionDemarcationStrategy {
340:
341: boolean trace = log.isTraceEnabled();
342:
343: Transaction trans = null;
344:
345: TransactionManager tm = pool.getActivation()
346: .getTransactionManager();;
347:
348: public XATransactionDemarcationStrategy() throws Throwable {
349:
350: final int timeout = pool.getActivation()
351: .getActivationSpec().getTransactionTimeout();
352:
353: if (timeout > 0) {
354: log
355: .trace("Setting transactionTimeout for JMSSessionPool to "
356: + timeout);
357: tm.setTransactionTimeout(timeout);
358:
359: }
360:
361: tm.begin();
362:
363: try {
364: trans = tm.getTransaction();
365:
366: if (trace)
367: log.trace(JmsServerSession.this + " using tx="
368: + trans);
369:
370: if (xaSession != null) {
371: XAResource res = JcaXAResourceWrapperFactory
372: .getResourceWrapper(xaSession
373: .getXAResource(), pool
374: .getActivation()
375: .getActivationSpec()
376: .getIsSameRMOverrideValue());
377:
378: if (!trans.enlistResource(res)) {
379: throw new JMSException(
380: "could not enlist resource");
381: }
382: if (trace)
383: log
384: .trace(JmsServerSession.this
385: + " XAResource '" + res
386: + "' enlisted.");
387: }
388: } catch (Throwable t) {
389: try {
390: tm.rollback();
391: } catch (Throwable ignored) {
392: log
393: .trace(
394: JmsServerSession.this
395: + " ignored error rolling back after failed enlist",
396: ignored);
397: }
398: throw t;
399: }
400:
401: }
402:
403: public void error() {
404: // Mark for tollback TX via TM
405: try {
406:
407: if (trace) {
408: log.trace(JmsServerSession.this
409: + " using TM to mark TX for rollback tx="
410: + trans);
411:
412: }
413:
414: trans.setRollbackOnly();
415: } catch (Throwable t) {
416: log.error(JmsServerSession.this
417: + " failed to set rollback only", t);
418: }
419:
420: }
421:
422: public void end() {
423: try {
424:
425: // Use the TM to commit the Tx (assert the correct association)
426: Transaction currentTx = tm.getTransaction();
427: if (trans.equals(currentTx) == false)
428: throw new IllegalStateException(
429: "Wrong tx association: expected " + trans
430: + " was " + currentTx);
431:
432: // Marked rollback
433: if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
434: if (trace)
435: log.trace(JmsServerSession.this
436: + " rolling back JMS transaction tx="
437: + trans);
438: // actually roll it back
439: tm.rollback();
440:
441: // NO XASession? then manually rollback.
442: // This is not so good but
443: // it's the best we can do if we have no XASession.
444: if (xaSession == null
445: && pool.getActivation()
446: .isDeliveryTransacted()) {
447: session.rollback();
448: }
449: }
450:
451: else if (trans.getStatus() == Status.STATUS_ACTIVE) {
452: // Commit tx
453: // This will happen if
454: // a) everything goes well
455: // b) app. exception was thrown
456: if (trace)
457: log.trace(JmsServerSession.this
458: + " commiting the JMS transaction tx="
459: + trans);
460: tm.commit();
461:
462: // NO XASession? then manually commit. This is not so good but
463: // it's the best we can do if we have no XASession.
464: if (xaSession == null
465: && pool.getActivation()
466: .isDeliveryTransacted()) {
467: session.commit();
468: }
469:
470: } else {
471: tm.suspend();
472:
473: if (xaSession == null
474: && pool.getActivation()
475: .isDeliveryTransacted()) {
476: session.rollback();
477: }
478:
479: }
480:
481: } catch (Throwable t) {
482: log.error(JmsServerSession.this
483: + " failed to commit/rollback", t);
484: }
485:
486: }
487:
488: }
489: }
|