001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ejb.plugins.inflow;
023:
024: import java.lang.reflect.Method;
025:
026: import javax.resource.ResourceException;
027: import javax.transaction.Status;
028: import javax.transaction.Transaction;
029: import javax.transaction.TransactionManager;
030: import javax.transaction.xa.XAResource;
031:
032: import org.jboss.ejb.MessageDrivenContainer;
033: import org.jboss.invocation.Invocation;
034: import org.jboss.logging.Logger;
035: import org.jboss.proxy.Interceptor;
036:
037: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
038:
039: /**
040: * Implements the application server message endpoint requirements.
041: *
042: * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
043: * @version $Revision: 60786 $
044: */
045: public class MessageEndpointInterceptor extends Interceptor {
046: /** The serialVersionUID */
047: private static final long serialVersionUID = -8740717288847385688L;
048:
049: // Constants -----------------------------------------------------
050:
051: /** The log */
052: private static final Logger log = Logger
053: .getLogger(MessageEndpointInterceptor.class);
054:
055: /** The key for the factory */
056: public static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpoint.Factory";
057:
058: /** The key for the xa resource */
059: public static final String MESSAGE_ENDPOINT_XARESOURCE = "MessageEndpoint.XAResource";
060:
061: // Attributes ----------------------------------------------------
062:
063: /** Whether trace is enabled */
064: private boolean trace = log.isTraceEnabled();
065:
066: /** Cached version of our proxy string */
067: private String cachedProxyString = null;
068:
069: /** Whether this proxy has been released */
070: protected SynchronizedBoolean released = new SynchronizedBoolean(
071: false);
072:
073: /** Whether we have delivered a message */
074: protected boolean delivered = false;
075:
076: /** The in use thread */
077: protected Thread inUseThread = null;
078:
079: /** The old classloader of the thread */
080: protected ClassLoader oldClassLoader = null;
081:
082: /** Any transaction we started */
083: protected Transaction transaction = null;
084:
085: /** Any suspended transaction */
086: protected Transaction suspended = null;
087:
088: /** The beforeDeliveryInvoked used to identify sequence of before/after invocation*/
089: protected boolean beforeDeliveryInvoked = false;
090:
091: /** The message endpoint factory */
092: private JBossMessageEndpointFactory endpointFactory;
093:
094: // Static --------------------------------------------------------
095:
096: // Constructors --------------------------------------------------
097:
098: public MessageEndpointInterceptor() {
099: }
100:
101: // Public --------------------------------------------------------
102:
103: // Interceptor implementation ------------------------------------
104:
105: public Object invoke(Invocation mi) throws Throwable {
106: // Are we still useable?
107: if (released.get())
108: throw new IllegalStateException("This message endpoint + "
109: + getProxyString(mi) + " has been released");
110:
111: // Concurrent invocation?
112: Thread currentThread = Thread.currentThread();
113: if (inUseThread != null
114: && inUseThread.equals(currentThread) == false)
115: throw new IllegalStateException("This message endpoint + "
116: + getProxyString(mi)
117: + " is already in use by another thread "
118: + inUseThread);
119: inUseThread = currentThread;
120:
121: String method = mi.getMethod().getName();
122: if (trace)
123: log.trace("MessageEndpoint " + getProxyString(mi)
124: + " in use by " + method + " " + inUseThread);
125:
126: // Which operation?
127: if (method.equals("release")) {
128: release(mi);
129: return null;
130: } else if (method.equals("beforeDelivery")) {
131: before(mi);
132: return null;
133: } else if (method.equals("afterDelivery")) {
134: after(mi);
135: return null;
136: } else
137: return delivery(mi);
138: }
139:
140: // Package Protected ---------------------------------------------
141:
142: // Protected -----------------------------------------------------
143:
144: /**
145: * Release this message endpoint.
146: *
147: * @param mi the invocation
148: * @throws Throwable for any error
149: */
150: protected void release(Invocation mi) throws Throwable {
151: // We are now released
152: released.set(true);
153:
154: if (trace)
155: log.trace("MessageEndpoint " + getProxyString(mi)
156: + " released");
157:
158: // Tidyup any outstanding delivery
159: if (oldClassLoader != null) {
160: try {
161: finish("release", mi, false);
162: } catch (Throwable t) {
163: log.warn("Error in release ", t);
164: }
165: }
166: }
167:
168: /**
169: * Before delivery processing.
170: *
171: * @param mi the invocation
172: * @throws Throwable for any error
173: */
174: protected void before(Invocation mi) throws Throwable {
175: // Called out of sequence
176: if (getBeforeDeliveryInvoke())
177: throw new IllegalStateException(
178: "Missing afterDelivery from the previous beforeDelivery for message endpoint "
179: + getProxyString(mi));
180:
181: // Set the classloader
182: MessageDrivenContainer container = getContainer(mi);
183: oldClassLoader = GetTCLAction
184: .getContextClassLoader(inUseThread);
185: SetTCLAction.setContextClassLoader(inUseThread, container
186: .getClassLoader());
187: if (trace)
188: log.trace("MessageEndpoint " + getProxyString(mi)
189: + " set context classloader to "
190: + container.getClassLoader());
191:
192: // start any transaction
193: try {
194: startTransaction("beforeDelivery", mi, container);
195: setBeforeDeliveryInvoke(true);
196: } catch (Throwable t) {
197: setBeforeDeliveryInvoke(false);
198: resetContextClassLoader(mi);
199: throw new ResourceException(t);
200: }
201: }
202:
203: /**
204: * After delivery processing.
205: *
206: * @param mi the invocation
207: * @throws Throwable for any error
208: */
209: protected void after(Invocation mi) throws Throwable {
210: // Called out of sequence
211: if (!getBeforeDeliveryInvoke()) {
212: throw new IllegalStateException(
213: "afterDelivery without a previous beforeDelivery for message endpoint "
214: + getProxyString(mi));
215:
216: }
217:
218: // Finish this delivery committing if we can
219: try {
220: finish("afterDelivery", mi, true);
221: } catch (Throwable t) {
222: throw new ResourceException(t);
223:
224: }
225: }
226:
227: /**
228: * Delivery.
229: *
230: * @param mi the invocation
231: * @return the result of the delivery
232: * @throws Throwable for any error
233: */
234: protected Object delivery(Invocation mi) throws Throwable {
235: // Have we already delivered a message?
236: if (delivered)
237: throw new IllegalStateException(
238: "Multiple message delivery between before and after delivery is not allowed for message endpoint "
239: + getProxyString(mi));
240:
241: if (trace)
242: log.trace("MessageEndpoint " + getProxyString(mi)
243: + " delivering");
244:
245: // Mark delivery if beforeDelivery was invoked
246: if (oldClassLoader != null)
247: delivered = true;
248:
249: MessageDrivenContainer container = getContainer(mi);
250: boolean commit = true;
251: try {
252: // Check for starting a transaction
253: if (oldClassLoader == null)
254: startTransaction("delivery", mi, container);
255: return getNext().invoke(mi);
256: } catch (Throwable t) {
257: if (trace)
258: log.trace("MessageEndpoint " + getProxyString(mi)
259: + " delivery error", t);
260: if (t instanceof Error || t instanceof RuntimeException) {
261: if (transaction != null)
262: transaction.setRollbackOnly();
263: commit = false;
264: }
265: throw t;
266: } finally {
267: // No before/after delivery, end any transaction and release the lock
268: if (oldClassLoader == null) {
269: try {
270: // Finish any transaction we started
271: endTransaction(mi, commit);
272: } finally {
273: releaseThreadLock(mi);
274: }
275: }
276: }
277: }
278:
279: /**
280: * Finish the current delivery
281: *
282: * @param context the lifecycle method
283: * @param mi the invocation
284: * @param commit whether to commit
285: * @throws Throwable for any error
286: */
287: protected void finish(String context, Invocation mi, boolean commit)
288: throws Throwable {
289: try {
290: endTransaction(mi, commit);
291: } finally {
292: setBeforeDeliveryInvoke(false);
293: // Reset delivered flag
294: delivered = false;
295: // Change back to the original context classloader
296: resetContextClassLoader(mi);
297: // We no longer hold the lock
298: releaseThreadLock(mi);
299: }
300: }
301:
302: /**
303: * Start a transaction
304: *
305: * @param context the lifecycle method
306: * @param mi the invocation
307: * @param container the container
308: * @throws Throwable for any error
309: */
310: protected void startTransaction(String context, Invocation mi,
311: MessageDrivenContainer container) throws Throwable {
312: // Get any passed resource
313: XAResource resource = (XAResource) mi.getInvocationContext()
314: .getValue(MESSAGE_ENDPOINT_XARESOURCE);
315:
316: Method method = null;
317:
318: // Normal delivery
319: if ("delivery".equals(context))
320: method = mi.getMethod();
321: // Before delivery
322: else
323: method = (Method) mi.getArguments()[0];
324:
325: // Is the delivery transacted?
326: boolean isTransacted = getMessageEndpointFactory(mi)
327: .isDeliveryTransacted(method);
328:
329: if (trace)
330: log.trace("MessageEndpoint " + getProxyString(mi) + " "
331: + context + " method=" + method + " xaResource="
332: + resource + " transacted=" + isTransacted);
333:
334: // Get the transaction status
335: TransactionManager tm = container.getTransactionManager();
336: suspended = tm.suspend();
337:
338: if (trace)
339: log.trace("MessageEndpoint " + getProxyString(mi) + " "
340: + context + " currentTx=" + suspended);
341:
342: // Delivery is transacted
343: if (isTransacted) {
344: // No transaction means we start a new transaction and enlist the resource
345: if (suspended == null) {
346: tm.begin();
347: transaction = tm.getTransaction();
348: if (trace)
349: log.trace("MessageEndpoint " + getProxyString(mi)
350: + " started transaction=" + transaction);
351:
352: // Enlist the XAResource in the transaction
353: if (resource != null) {
354: transaction.enlistResource(resource);
355: if (trace)
356: log.trace("MessageEndpoint "
357: + getProxyString(mi) + " enlisted="
358: + resource);
359: }
360: } else {
361: // If there is already a transaction we ignore the XAResource (by spec 12.5.9)
362: try {
363: tm.resume(suspended);
364: } finally {
365: suspended = null;
366: if (trace)
367: log.trace("MessageEndpoint "
368: + getProxyString(mi) + " transaction="
369: + suspended
370: + " already active, IGNORED="
371: + resource);
372: }
373: }
374: }
375: }
376:
377: /**
378: * End the transaction
379: *
380: * @param mi the invocation
381: * @param commit whether to try to commit
382: * @throws Throwable for any error
383: */
384: protected void endTransaction(Invocation mi, boolean commit)
385: throws Throwable {
386: TransactionManager tm = null;
387: Transaction currentTx = null;
388: try {
389: // If we started the transaction, commit it
390: if (transaction != null) {
391: tm = getContainer(mi).getTransactionManager();
392: currentTx = tm.getTransaction();
393:
394: // Suspend any bad transaction - there is bug somewhere, but we will try to tidy things up
395: if (currentTx != null
396: && currentTx.equals(transaction) == false) {
397: log.warn("Current transaction " + currentTx
398: + " is not the expected transaction.");
399: tm.suspend();
400: tm.resume(transaction);
401: } else {
402: // We have the correct transaction
403: currentTx = null;
404: }
405:
406: // Commit or rollback depending on the status
407: if (commit == false
408: || transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
409: if (trace)
410: log.trace("MessageEndpoint "
411: + getProxyString(mi) + " rollback");
412: tm.rollback();
413: } else {
414: if (trace)
415: log.trace("MessageEndpoint "
416: + getProxyString(mi) + " commit");
417: tm.commit();
418: }
419: }
420:
421: // If we suspended the incoming transaction, resume it
422: if (suspended != null) {
423: try {
424: tm = getContainer(mi).getTransactionManager();
425: tm.resume(suspended);
426: } finally {
427: suspended = null;
428: }
429: }
430: } finally {
431: // Resume any suspended transaction
432: if (currentTx != null) {
433: try {
434: tm.resume(currentTx);
435: } catch (Throwable t) {
436: log.warn("MessageEndpoint " + getProxyString(mi)
437: + " failed to resume old transaction "
438: + currentTx);
439:
440: }
441: }
442: }
443: }
444:
445: /**
446: * Reset the context classloader
447: *
448: * @param mi the invocation
449: */
450: protected void resetContextClassLoader(Invocation mi) {
451: if (trace)
452: log.trace("MessageEndpoint " + getProxyString(mi)
453: + " reset classloader " + oldClassLoader);
454: SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
455: oldClassLoader = null;
456: }
457:
458: protected void setBeforeDeliveryInvoke(boolean bdi) {
459: this .beforeDeliveryInvoked = bdi;
460:
461: }
462:
463: protected boolean getBeforeDeliveryInvoke() {
464: return this .beforeDeliveryInvoked;
465:
466: }
467:
468: /**
469: * Release the thread lock
470: *
471: * @param mi the invocation
472: */
473: protected void releaseThreadLock(Invocation mi) {
474: if (trace)
475: log.trace("MessageEndpoint " + getProxyString(mi)
476: + " no longer in use by " + inUseThread);
477: inUseThread = null;
478: }
479:
480: /**
481: * Get our proxy's string value.
482: *
483: * @param mi the invocation
484: * @return the string
485: */
486: protected String getProxyString(Invocation mi) {
487: if (cachedProxyString == null)
488: cachedProxyString = mi.getInvocationContext().getCacheId()
489: .toString();
490: return cachedProxyString;
491: }
492:
493: /**
494: * Get the message endpoint factory
495: *
496: * @return the message endpoint factory
497: */
498: protected JBossMessageEndpointFactory getMessageEndpointFactory(
499: Invocation mi) {
500: if (endpointFactory == null)
501: endpointFactory = (JBossMessageEndpointFactory) mi
502: .getInvocationContext().getValue(
503: MESSAGE_ENDPOINT_FACTORY);
504: return endpointFactory;
505: }
506:
507: /**
508: * Get the container
509: *
510: * @return the container
511: */
512: protected MessageDrivenContainer getContainer(Invocation mi) {
513: return getMessageEndpointFactory(mi).getContainer();
514: }
515:
516: // Private -------------------------------------------------------
517:
518: // Inner classes -------------------------------------------------
519: }
|