001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ejb3.mdb.inflow;
023:
024: import java.lang.reflect.Method;
025: import java.lang.reflect.InvocationHandler;
026:
027: import javax.resource.ResourceException;
028: import javax.resource.spi.endpoint.MessageEndpointFactory;
029: import javax.transaction.Status;
030: import javax.transaction.Transaction;
031: import javax.transaction.TransactionManager;
032: import javax.transaction.xa.XAResource;
033:
034: import org.jboss.aop.joinpoint.Invocation;
035: import org.jboss.aop.joinpoint.MethodInvocation;
036: import org.jboss.aop.MethodInfo;
037: import org.jboss.ejb3.mdb.MessagingContainer;
038: import org.jboss.ejb3.mdb.MDB;
039: import org.jboss.ejb3.tx.TxUtil;
040: import org.jboss.logging.Logger;
041:
042: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
043:
044: /**
045: * @version <tt>$Revision: 60233 $</tt>
046: * @author <a href="mailto:bdecoste@jboss.com">William DeCoste</a>
047: */
048: public class MessageInflowLocalProxy implements InvocationHandler {
049: private static final Logger log = Logger
050: .getLogger(MessageInflowLocalProxy.class);
051:
052: /** The key for the factory */
053: public static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpoint.Factory";
054:
055: /** The key for the xa resource */
056: public static final String MESSAGE_ENDPOINT_XARESOURCE = "MessageEndpoint.XAResource";
057:
058: /** Whether trace is enabled */
059: private boolean trace = log.isTraceEnabled();
060:
061: /** Cached version of our proxy string */
062: private String cachedProxyString = null;
063:
064: /** Whether this proxy has been released */
065: protected SynchronizedBoolean released = new SynchronizedBoolean(
066: false);
067:
068: /** Whether we have delivered a message */
069: protected boolean delivered = false;
070:
071: /** The in use thread */
072: protected Thread inUseThread = null;
073:
074: /** The old classloader of the thread */
075: protected ClassLoader oldClassLoader = null;
076:
077: /** Any transaction we started */
078: protected Transaction transaction = null;
079:
080: /** Any suspended transaction */
081: protected Transaction suspended = null;
082:
083: /** The message endpoint factory */
084: private JBossMessageEndpointFactory endpointFactory;
085:
086: private XAResource resource;
087: private MessageEndpointFactory messageEndpointFactory;
088:
089: MessagingContainer container;
090:
091: protected MessageInflowLocalProxy(MessagingContainer container) {
092: this .container = container;
093: }
094:
095: public void setMessageEndpointFactory(
096: MessageEndpointFactory messageEndpointFactory) {
097: this .messageEndpointFactory = messageEndpointFactory;
098: }
099:
100: public void setXaResource(XAResource resource) {
101: this .resource = resource;
102: }
103:
104: public Object invoke(Object proxy, Method method, Object[] args)
105: throws Throwable {
106: // Are we still useable?
107: if (released.get())
108: throw new IllegalStateException("This message endpoint + "
109: + getProxyString(proxy) + " has been released");
110:
111: // Concurrent invocation?
112: Thread currentThread = Thread.currentThread();
113: if (inUseThread != null
114: && inUseThread.equals(currentThread) == false)
115: throw new IllegalStateException("This message endpoint + "
116: + getProxyString(proxy)
117: + " is already in use by another thread "
118: + inUseThread);
119: inUseThread = currentThread;
120:
121: if (trace)
122: log.trace("MessageEndpoint " + getProxyString(proxy)
123: + " in use by " + method + " " + inUseThread);
124:
125: // Which operation?
126: if (method.getName().equals("release")) {
127: release(proxy);
128: return null;
129: } else if (method.getName().equals("beforeDelivery")) {
130: before(proxy, container, method, args);
131: return null;
132: } else if (method.getName().equals("afterDelivery")) {
133: after(proxy);
134: return null;
135: } else
136: return delivery(proxy, container, method, args);
137: }
138:
139: public String toString() {
140: return container.getEjbName().toString();
141: }
142:
143: // -----------------------------------------------------------
144:
145: /**
146: * Release this message endpoint.
147: *
148: * @param mi the invocation
149: * @throws Throwable for any error
150: */
151: protected void release(Object proxy) throws Throwable {
152: // We are now released
153: released.set(true);
154:
155: if (trace)
156: log.trace("MessageEndpoint " + getProxyString(proxy)
157: + " released");
158:
159: // Tidyup any outstanding delivery
160: if (oldClassLoader != null) {
161: try {
162: finish("release", proxy, false);
163: } catch (Throwable t) {
164: log.warn("Error in release ", t);
165: }
166: }
167: }
168:
169: /**
170: * Before delivery processing.
171: *
172: * @param mi the invocation
173: * @throws Throwable for any error
174: */
175: protected void before(Object proxy, MessagingContainer container,
176: Method method, Object[] args) throws Throwable {
177: // Called out of sequence
178: if (oldClassLoader != null)
179: throw new IllegalStateException(
180: "Missing afterDelivery from the previous beforeDelivery for message endpoint "
181: + getProxyString(proxy));
182:
183: if (trace)
184: log.trace("MessageEndpoint " + getProxyString(proxy)
185: + " released");
186:
187: // Set the classloader
188: oldClassLoader = GetTCLAction
189: .getContextClassLoader(inUseThread);
190: SetTCLAction.setContextClassLoader(inUseThread, container
191: .getClassloader());
192: if (trace)
193: log.trace("MessageEndpoint " + getProxyString(proxy)
194: + " set context classloader to "
195: + container.getClassloader());
196:
197: // start any transaction
198: try {
199: // Is the delivery transacted?
200: MethodInfo methodInfo = container
201: .getMethodInfo((Method) args[0]);
202: boolean isTransacted = messageEndpointFactory
203: .isDeliveryTransacted(methodInfo.getAdvisedMethod());
204:
205: startTransaction("beforeDelivery", proxy, container,
206: method, args, isTransacted);
207: } catch (Throwable t) {
208: resetContextClassLoader(proxy);
209: throw new ResourceException(t);
210: }
211: }
212:
213: /**
214: * After delivery processing.
215: *
216: * @param mi the invocation
217: * @throws Throwable for any error
218: */
219: protected void after(Object proxy) throws Throwable {
220: // Called out of sequence
221: if (oldClassLoader == null)
222: throw new IllegalStateException(
223: "afterDelivery without a previous beforeDelivery for message endpoint "
224: + getProxyString(proxy));
225:
226: // Finish this delivery committing if we can
227: try {
228: finish("afterDelivery", proxy, true);
229: } catch (Throwable t) {
230: throw new ResourceException(t);
231: }
232: }
233:
234: /**
235: * Delivery.
236: *
237: * @param mi the invocation
238: * @return the result of the delivery
239: * @throws Throwable for any error
240: */
241: protected Object delivery(Object proxy,
242: MessagingContainer container, Method method, Object[] args)
243: throws Throwable {
244: // Have we already delivered a message?
245: if (delivered)
246: throw new IllegalStateException(
247: "Multiple message delivery between before and after delivery is not allowed for message endpoint "
248: + getProxyString(proxy));
249:
250: if (trace)
251: log.trace("MessageEndpoint " + getProxyString(proxy)
252: + " delivering");
253:
254: // Mark delivery if beforeDelivery was invoked
255: if (oldClassLoader != null)
256: delivered = true;
257:
258: boolean commit = true;
259: // Is the delivery transacted?
260: MethodInfo methodInfo = container.getMethodInfo(method);
261:
262: try {
263: // Check for starting a transaction
264: if (oldClassLoader == null) {
265: boolean isTransacted = messageEndpointFactory
266: .isDeliveryTransacted(methodInfo
267: .getAdvisedMethod());
268: startTransaction("delivery", proxy, container, method,
269: args, isTransacted);
270: }
271: return container.localInvoke(methodInfo, args);
272: } catch (Throwable t) {
273: if (trace)
274: log.trace("MessageEndpoint " + getProxyString(proxy)
275: + " delivery error", t);
276: if (t instanceof Error || t instanceof RuntimeException) {
277: if (transaction != null)
278: transaction.setRollbackOnly();
279: commit = false;
280: }
281: throw t;
282: } finally {
283: // No before/after delivery, end any transaction and release the lock
284: if (oldClassLoader == null) {
285: try {
286: // Finish any transaction we started
287: endTransaction(proxy, commit);
288: } finally {
289: releaseThreadLock(proxy);
290: }
291: }
292: }
293: }
294:
295: /**
296: * Finish the current delivery
297: *
298: * @param context the lifecycle method
299: * @param mi the invocation
300: * @param commit whether to commit
301: * @throws Throwable for any error
302: */
303: protected void finish(String context, Object proxy, boolean commit)
304: throws Throwable {
305: try {
306: endTransaction(proxy, commit);
307: } finally {
308: // Reset delivered flag
309: delivered = false;
310: // Change back to the original context classloader
311: resetContextClassLoader(proxy);
312: // We no longer hold the lock
313: releaseThreadLock(proxy);
314: }
315: }
316:
317: /**
318: * Start a transaction
319: *
320: * @param context the lifecycle method
321: * @param mi the invocation
322: * @param container the container
323: * @throws Throwable for any error
324: */
325: protected void startTransaction(String context, Object proxy,
326: MessagingContainer container, Method m, Object[] args,
327: boolean isTransacted) throws Throwable {
328: Method method;
329:
330: // Normal delivery
331: if ("delivery".equals(context))
332: method = m;
333: // Before delivery
334: else
335: method = (Method) args[0];
336:
337: if (trace)
338: log.trace("MessageEndpoint " + getProxyString(proxy) + " "
339: + context + " method=" + method + " xaResource="
340: + resource + " transacted=" + isTransacted);
341:
342: // Get the transaction status
343: TransactionManager tm = TxUtil.getTransactionManager(); //container.getTransactionManager();
344: suspended = tm.suspend();
345:
346: if (trace)
347: log.trace("MessageEndpoint " + getProxyString(proxy) + " "
348: + context + " currentTx=" + suspended);
349:
350: // Delivery is transacted
351: if (isTransacted) {
352: // No transaction means we start a new transaction and enlist the resource
353: if (suspended == null) {
354: tm.begin();
355: transaction = tm.getTransaction();
356: if (trace)
357: log.trace("MessageEndpoint "
358: + getProxyString(proxy)
359: + " started transaction=" + transaction);
360:
361: // Enlist the XAResource in the transaction
362: if (resource != null) {
363: transaction.enlistResource(resource);
364: if (trace)
365: log.trace("MessageEndpoint "
366: + getProxyString(proxy) + " enlisted="
367: + resource);
368: }
369: } else {
370: // If there is already a transaction we ignore the XAResource (by spec 12.5.9)
371: try {
372: tm.resume(suspended);
373: } finally {
374: suspended = null;
375: if (trace)
376: log.trace("MessageEndpoint "
377: + getProxyString(proxy)
378: + " transaction=" + suspended
379: + " already active, IGNORED="
380: + resource);
381: }
382: }
383: }
384: }
385:
386: /**
387: * End the transaction
388: *
389: * @param mi the invocation
390: * @param commit whether to try to commit
391: * @throws Throwable for any error
392: */
393: protected void endTransaction(Object proxy, boolean commit)
394: throws Throwable {
395: TransactionManager tm = null;
396: Transaction currentTx = null;
397: try {
398: // If we started the transaction, commit it
399: if (transaction != null) {
400: tm = TxUtil.getTransactionManager(); //getContainer(mi).getTransactionManager();
401: currentTx = tm.getTransaction();
402:
403: // Suspend any bad transaction - there is bug somewhere, but we will try to tidy things up
404: if (currentTx != null
405: && currentTx.equals(transaction) == false) {
406: log.warn("Current transaction " + currentTx
407: + " is not the expected transaction.");
408: tm.suspend();
409: tm.resume(transaction);
410: } else {
411: // We have the correct transaction
412: currentTx = null;
413: }
414:
415: // Commit or rollback depending on the status
416: if (commit == false
417: || transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
418: if (trace)
419: log.trace("MessageEndpoint "
420: + getProxyString(proxy) + " rollback");
421: tm.rollback();
422: } else {
423: if (trace)
424: log.trace("MessageEndpoint "
425: + getProxyString(proxy) + " commit");
426: tm.commit();
427: }
428: }
429:
430: // If we suspended the incoming transaction, resume it
431: if (suspended != null) {
432: try {
433: tm = TxUtil.getTransactionManager(); //getContainer(mi).getTransactionManager();
434: tm.resume(suspended);
435: } finally {
436: suspended = null;
437: }
438: }
439: } finally {
440: // Resume any suspended transaction
441: if (currentTx != null) {
442: try {
443: tm.resume(currentTx);
444: } catch (Throwable t) {
445: log.warn("MessageEndpoint " + getProxyString(proxy)
446: + " failed to resume old transaction "
447: + currentTx);
448:
449: }
450: }
451: }
452: }
453:
454: /**
455: * Reset the context classloader
456: *
457: * @param mi the invocation
458: */
459: protected void resetContextClassLoader(Object proxy) {
460: if (trace)
461: log.trace("MessageEndpoint " + getProxyString(proxy)
462: + " reset classloader " + oldClassLoader);
463: SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
464: oldClassLoader = null;
465: }
466:
467: /**
468: * Release the thread lock
469: *
470: * @param mi the invocation
471: */
472: protected void releaseThreadLock(Object proxy) {
473: if (trace)
474: log.trace("MessageEndpoint " + getProxyString(proxy)
475: + " no longer in use by " + inUseThread);
476: inUseThread = null;
477: }
478:
479: /**
480: * Get our proxy's string value.
481: *
482: * @param mi the invocation
483: * @return the string
484: */
485: protected String getProxyString(Object proxy) {
486: if (cachedProxyString == null)
487: cachedProxyString = container.getEjbName();
488: return cachedProxyString;
489: }
490:
491: /**
492: * Get the message endpoint factory
493: *
494: * @return the message endpoint factory
495: */
496: protected JBossMessageEndpointFactory getMessageEndpointFactory(
497: Invocation invocation) {
498: if (endpointFactory == null) {
499: MethodInvocation mi = (MethodInvocation) invocation;
500: endpointFactory = (JBossMessageEndpointFactory) mi
501: .getResponseAttachment(MESSAGE_ENDPOINT_FACTORY);
502: }
503: return endpointFactory;
504: }
505:
506: /**
507: * Get the container
508: *
509: * @return the container
510: */
511: protected MessagingContainer getContainer(Invocation mi) {
512: return getMessageEndpointFactory(mi).getContainer();
513: }
514: }
|