001: /**
002: *
003: * Licensed to the Apache Software Foundation (ASF) under one or more
004: * contributor license agreements. See the NOTICE file distributed with
005: * this work for additional information regarding copyright ownership.
006: * The ASF licenses this file to You under the Apache License, Version 2.0
007: * (the "License"); you may not use this file except in compliance with
008: * the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: */package org.apache.openejb.core.mdb;
018:
019: import org.apache.openejb.OpenEJBException;
020: import org.apache.openejb.DeploymentInfo;
021: import org.apache.openejb.SystemException;
022: import org.apache.openejb.ApplicationException;
023: import org.apache.openejb.ContainerType;
024: import org.apache.openejb.RpcContainer;
025: import org.apache.openejb.core.BaseContext;
026: import org.apache.openejb.core.CoreDeploymentInfo;
027: import org.apache.openejb.core.Operation;
028: import org.apache.openejb.core.ThreadContext;
029: import org.apache.openejb.core.timer.EjbTimerService;
030: import org.apache.openejb.core.interceptor.InterceptorData;
031: import org.apache.openejb.core.interceptor.InterceptorStack;
032: import org.apache.openejb.core.transaction.TransactionContainer;
033: import org.apache.openejb.core.transaction.TransactionContext;
034: import org.apache.openejb.core.transaction.TransactionPolicy;
035: import org.apache.openejb.spi.SecurityService;
036: import org.apache.openejb.util.LogCategory;
037: import org.apache.openejb.util.Logger;
038:
039: import org.apache.xbean.recipe.ObjectRecipe;
040: import org.apache.xbean.recipe.Option;
041:
042: import javax.transaction.TransactionManager;
043: import javax.transaction.Transaction;
044: import javax.transaction.xa.XAResource;
045: import javax.resource.spi.ResourceAdapter;
046: import javax.resource.spi.ActivationSpec;
047: import javax.resource.spi.UnavailableException;
048: import javax.resource.ResourceException;
049: import java.lang.reflect.Method;
050: import java.util.List;
051: import java.util.Map;
052: import java.util.Arrays;
053: import java.util.TreeSet;
054: import java.util.Set;
055: import java.util.concurrent.ConcurrentMap;
056: import java.util.concurrent.ConcurrentHashMap;
057:
058: public class MdbContainer implements RpcContainer, TransactionContainer {
059: private static final Logger logger = Logger.getInstance(
060: LogCategory.OPENEJB, "org.apache.openejb.util.resources");
061: private static final Object[] NO_ARGS = new Object[0];
062:
063: private final Object containerID;
064: private final TransactionManager transactionManager;
065: private final SecurityService securityService;
066: private final ResourceAdapter resourceAdapter;
067: private final Class messageListenerInterface;
068: private final Class activationSpecClass;
069: private final int instanceLimit;
070: private final boolean txRecovery;
071:
072: private final ConcurrentMap<Object, CoreDeploymentInfo> deployments = new ConcurrentHashMap<Object, CoreDeploymentInfo>();
073:
074: public MdbContainer(Object containerID,
075: TransactionManager transactionManager,
076: SecurityService securityService,
077: ResourceAdapter resourceAdapter,
078: Class messageListenerInterface, Class activationSpecClass,
079: int instanceLimit, boolean txRecovery) {
080: this .containerID = containerID;
081: this .transactionManager = transactionManager;
082: this .securityService = securityService;
083: this .resourceAdapter = resourceAdapter;
084: this .messageListenerInterface = messageListenerInterface;
085: this .activationSpecClass = activationSpecClass;
086: this .instanceLimit = instanceLimit;
087: this .txRecovery = txRecovery;
088: }
089:
090: public DeploymentInfo[] deployments() {
091: return deployments.values().toArray(
092: new DeploymentInfo[deployments.size()]);
093: }
094:
095: public DeploymentInfo getDeploymentInfo(Object deploymentID) {
096: return deployments.get(deploymentID);
097: }
098:
099: public ContainerType getContainerType() {
100: return ContainerType.MESSAGE_DRIVEN;
101: }
102:
103: public Object getContainerID() {
104: return containerID;
105: }
106:
107: public ResourceAdapter getResourceAdapter() {
108: return resourceAdapter;
109: }
110:
111: public Class getMessageListenerInterface() {
112: return messageListenerInterface;
113: }
114:
115: public Class getActivationSpecClass() {
116: return activationSpecClass;
117: }
118:
119: public void deploy(DeploymentInfo info) throws OpenEJBException {
120: CoreDeploymentInfo deploymentInfo = (CoreDeploymentInfo) info;
121: Object deploymentId = deploymentInfo.getDeploymentID();
122: if (!deploymentInfo.getMdbInterface().equals(
123: messageListenerInterface)) {
124: throw new OpenEJBException("Deployment '" + deploymentId
125: + "' has message listener interface "
126: + deploymentInfo.getMdbInterface().getName()
127: + " but this MDB container only supports "
128: + messageListenerInterface);
129: }
130:
131: // create the activation spec
132: ActivationSpec activationSpec = createActivationSpec(deploymentInfo);
133:
134: // create the message endpoint
135: MdbInstanceFactory instanceFactory = new MdbInstanceFactory(
136: deploymentInfo, transactionManager, securityService,
137: instanceLimit);
138: EndpointFactory endpointFactory = new EndpointFactory(
139: activationSpec, this , deploymentInfo, instanceFactory,
140: txRecovery);
141:
142: // update the data structures
143: // this must be done before activating the endpoint since the ra may immedately begin delivering messages
144: deploymentInfo.setContainer(this );
145: deploymentInfo.setContainerData(endpointFactory);
146: deployments.put(deploymentId, deploymentInfo);
147:
148: // activate the endpoint
149: try {
150: resourceAdapter.endpointActivation(endpointFactory,
151: activationSpec);
152: } catch (ResourceException e) {
153: // activation failed... clean up
154: deploymentInfo.setContainer(null);
155: deploymentInfo.setContainerData(null);
156: deployments.remove(deploymentId);
157:
158: throw new OpenEJBException(e);
159: }
160:
161: // start the timer service
162: EjbTimerService timerService = deploymentInfo
163: .getEjbTimerService();
164: if (timerService != null) {
165: timerService.start();
166: }
167: }
168:
169: private ActivationSpec createActivationSpec(
170: DeploymentInfo deploymentInfo) throws OpenEJBException {
171: try {
172: // initialize the object recipe
173: ObjectRecipe objectRecipe = new ObjectRecipe(
174: activationSpecClass);
175: objectRecipe.allow(Option.IGNORE_MISSING_PROPERTIES);
176: objectRecipe.disallow(Option.FIELD_INJECTION);
177:
178: Map<String, String> activationProperties = deploymentInfo
179: .getActivationProperties();
180: for (Map.Entry<String, String> entry : activationProperties
181: .entrySet()) {
182: objectRecipe.setMethodProperty(entry.getKey(), entry
183: .getValue());
184: }
185:
186: // create the activationSpec
187: ActivationSpec activationSpec = (ActivationSpec) objectRecipe
188: .create(activationSpecClass.getClassLoader());
189:
190: // verify all properties except "destination" and "destinationType" were consumed
191: Set<String> unusedProperties = new TreeSet<String>(
192: objectRecipe.getUnsetProperties().keySet());
193: unusedProperties.remove("destination");
194: unusedProperties.remove("destinationType");
195: if (!unusedProperties.isEmpty()) {
196: throw new IllegalArgumentException(
197: "No setter found for the activation spec properties: "
198: + unusedProperties);
199: }
200:
201: // validate the activation spec
202: try {
203: activationSpec.validate();
204: } catch (UnsupportedOperationException uoe) {
205: logger
206: .info("ActivationSpec does not support validate. Implementation of validate is optional");
207: }
208:
209: // set the resource adapter into the activation spec
210: activationSpec.setResourceAdapter(resourceAdapter);
211:
212: return activationSpec;
213: } catch (Exception e) {
214: throw new OpenEJBException(
215: "Unable to create activation spec", e);
216: }
217: }
218:
219: public void undeploy(DeploymentInfo info) throws OpenEJBException {
220: if (!(info instanceof CoreDeploymentInfo)) {
221: return;
222: }
223:
224: CoreDeploymentInfo deploymentInfo = (CoreDeploymentInfo) info;
225: try {
226: EndpointFactory endpointFactory = (EndpointFactory) deploymentInfo
227: .getContainerData();
228: if (endpointFactory != null) {
229: resourceAdapter.endpointDeactivation(endpointFactory,
230: endpointFactory.getActivationSpec());
231: }
232: } finally {
233: deploymentInfo.setContainer(null);
234: deploymentInfo.setContainerData(null);
235: deployments.remove(deploymentInfo.getDeploymentID());
236: }
237: }
238:
239: /**
240: * @deprecated use invoke signature without 'securityIdentity' argument.
241: */
242: public Object invoke(Object deployID, Method callMethod,
243: Object[] args, Object primKey, Object securityIdentity)
244: throws OpenEJBException {
245: return invoke(deployID, callMethod.getDeclaringClass(),
246: callMethod, args, primKey);
247: }
248:
249: public Object invoke(Object deploymentId, Class callInterface,
250: Method method, Object[] args, Object primKey)
251: throws OpenEJBException {
252: CoreDeploymentInfo deploymentInfo = (CoreDeploymentInfo) getDeploymentInfo(deploymentId);
253:
254: EndpointFactory endpointFactory = (EndpointFactory) deploymentInfo
255: .getContainerData();
256: MdbInstanceFactory instanceFactory = endpointFactory
257: .getInstanceFactory();
258: Instance instance = null;
259: try {
260: instance = (Instance) instanceFactory.createInstance(true);
261: } catch (UnavailableException e) {
262: throw new SystemException(
263: "Unable to create instance for invocation", e);
264: }
265:
266: try {
267: beforeDelivery(deploymentInfo, instance, method, null);
268: Object value = invoke(instance, method, args);
269: afterDelivery(instance);
270: return value;
271: } finally {
272: instanceFactory.freeInstance(instance, true);
273: }
274: }
275:
276: public void beforeDelivery(CoreDeploymentInfo deployInfo,
277: Object instance, Method method, XAResource xaResource)
278: throws SystemException {
279: // intialize call context
280: ThreadContext callContext = new ThreadContext(deployInfo, null);
281: ThreadContext oldContext = ThreadContext.enter(callContext);
282:
283: // create mdb context
284: MdbCallContext mdbCallContext = new MdbCallContext();
285: callContext.set(MdbCallContext.class, mdbCallContext);
286: mdbCallContext.deliveryMethod = method;
287: mdbCallContext.oldCallContext = oldContext;
288:
289: // add tx data
290: mdbCallContext.txPolicy = deployInfo
291: .getTransactionPolicy(method);
292: mdbCallContext.txContext = new TransactionContext(callContext,
293: transactionManager);
294:
295: // call the tx before method
296: try {
297: boolean adapterTransaction = transactionManager
298: .getTransaction() != null;
299: mdbCallContext.txPolicy.beforeInvoke(instance,
300: mdbCallContext.txContext);
301:
302: // if we have an xaResource and a transaction was not imported from the adapter, enlist the xaResource
303: if (xaResource != null && !adapterTransaction) {
304: Transaction transaction = transactionManager
305: .getTransaction();
306: if (transaction != null) {
307: transaction.enlistResource(xaResource);
308: }
309: }
310: } catch (ApplicationException e) {
311: ThreadContext.exit(oldContext);
312: throw new SystemException(
313: "Should never get an Application exception", e);
314: } catch (SystemException e) {
315: ThreadContext.exit(oldContext);
316: throw e;
317: } catch (Exception e) {
318: ThreadContext.exit(oldContext);
319: throw new SystemException(
320: "Unable to enlist xa resource in the transaction",
321: e);
322: }
323: }
324:
325: public Object invoke(Object instance, Method method, Object... args)
326: throws SystemException, ApplicationException {
327: if (args == null) {
328: args = NO_ARGS;
329: }
330:
331: // get the context data
332: ThreadContext callContext = ThreadContext.getThreadContext();
333: CoreDeploymentInfo deployInfo = callContext.getDeploymentInfo();
334: MdbCallContext mdbCallContext = callContext
335: .get(MdbCallContext.class);
336:
337: if (mdbCallContext == null) {
338: throw new IllegalStateException(
339: "beforeDelivery was not called");
340: }
341:
342: // verify the delivery method passed to beforeDeliver is the same method that was invoked
343: if (!mdbCallContext.deliveryMethod.getName().equals(
344: method.getName())
345: || !Arrays.deepEquals(mdbCallContext.deliveryMethod
346: .getParameterTypes(), method
347: .getParameterTypes())) {
348: throw new IllegalStateException(
349: "Delivery method specified in beforeDelivery is not the delivery method called");
350: }
351:
352: // remember the return value or exception so it can be logged
353: Object returnValue = null;
354: OpenEJBException openEjbException = null;
355: Operation oldOperation = callContext.getCurrentOperation();
356: callContext.setCurrentOperation(Operation.BUSINESS);
357: BaseContext.State[] originalStates = callContext
358: .setCurrentAllowedStates(MdbContext.getStates());
359: try {
360: if (logger.isInfoEnabled()) {
361: logger.info("invoking method " + method.getName()
362: + " on " + deployInfo.getDeploymentID());
363: }
364:
365: // determine the target method on the bean instance class
366: Method targetMethod = deployInfo
367: .getMatchingBeanMethod(method);
368:
369: callContext.set(Method.class, targetMethod);
370:
371: // invoke the target method
372: returnValue = _invoke(instance, targetMethod, args,
373: deployInfo, mdbCallContext);
374: return returnValue;
375: } catch (ApplicationException e) {
376: openEjbException = e;
377: throw e;
378: } catch (SystemException e) {
379: openEjbException = e;
380: throw e;
381: } finally {
382: callContext.setCurrentOperation(oldOperation);
383: callContext.setCurrentAllowedStates(originalStates);
384: // Log the invocation results
385: if (logger.isDebugEnabled()) {
386: if (openEjbException == null) {
387: logger.debug("finished invoking method "
388: + method.getName() + ". Return value:"
389: + returnValue);
390: } else {
391: Throwable exception = (openEjbException
392: .getRootCause() != null) ? openEjbException
393: .getRootCause() : openEjbException;
394: logger.debug("finished invoking method "
395: + method.getName() + " with exception "
396: + exception);
397: }
398: }
399: }
400: }
401:
402: private Object _invoke(Object instance, Method runMethod,
403: Object[] args, DeploymentInfo deploymentInfo,
404: MdbCallContext mdbCallContext) throws SystemException,
405: ApplicationException {
406: Object returnValue = null;
407: try {
408: List<InterceptorData> interceptors = deploymentInfo
409: .getMethodInterceptors(runMethod);
410: InterceptorStack interceptorStack = new InterceptorStack(
411: ((Instance) instance).bean, runMethod,
412: Operation.BUSINESS, interceptors,
413: ((Instance) instance).interceptors);
414: returnValue = interceptorStack.invoke(args);
415: return returnValue;
416: } catch (java.lang.reflect.InvocationTargetException ite) {// handle exceptions thrown by enterprise bean
417: if (!isApplicationException(deploymentInfo, ite
418: .getTargetException())) {
419: //
420: /// System Exception ****************************
421: mdbCallContext.txPolicy.handleSystemException(ite
422: .getTargetException(), instance,
423: mdbCallContext.txContext);
424: } else {
425: //
426: // Application Exception ***********************
427: mdbCallContext.txPolicy.handleApplicationException(ite
428: .getTargetException(), false,
429: mdbCallContext.txContext);
430: }
431: } catch (Throwable re) {// handle reflection exception
432: // Any exception thrown by reflection; not by the enterprise bean. Possible
433: // Exceptions are:
434: // IllegalAccessException - if the underlying method is inaccessible.
435: // IllegalArgumentException - if the number of actual and formal parameters differ, or if an unwrapping conversion fails.
436: // NullPointerException - if the specified object is null and the method is an instance method.
437: // ExceptionInInitializerError - if the initialization provoked by this method fails.
438: if (!isApplicationException(deploymentInfo, re)) {
439: //
440: /// System Exception ****************************
441: mdbCallContext.txPolicy.handleSystemException(re,
442: instance, mdbCallContext.txContext);
443: } else {
444: //
445: // Application Exception ***********************
446: mdbCallContext.txPolicy.handleApplicationException(re,
447: false, mdbCallContext.txContext);
448: }
449: }
450: throw new AssertionError("Should not get here");
451: }
452:
453: private boolean isApplicationException(
454: DeploymentInfo deploymentInfo, Throwable e) {
455: return e instanceof Exception
456: && !(e instanceof RuntimeException);
457: }
458:
459: public void afterDelivery(Object instance) throws SystemException {
460: // get the mdb call context
461: ThreadContext callContext = ThreadContext.getThreadContext();
462: MdbCallContext mdbCallContext = callContext
463: .get(MdbCallContext.class);
464:
465: // invoke the tx after method
466: try {
467: mdbCallContext.txPolicy.afterInvoke(instance,
468: mdbCallContext.txContext);
469: } catch (ApplicationException e) {
470: throw new SystemException(
471: "Should never get an Application exception", e);
472: } finally {
473: ThreadContext.exit(mdbCallContext.oldCallContext);
474: }
475: }
476:
477: public void release(CoreDeploymentInfo deployInfo, Object instance) {
478: // get the mdb call context
479: ThreadContext callContext = ThreadContext.getThreadContext();
480: if (callContext == null) {
481: callContext = new ThreadContext(deployInfo, null);
482: ThreadContext.enter(callContext);
483:
484: }
485:
486: // if we have an mdb call context we need to invoke the after invoke method
487: MdbCallContext mdbCallContext = callContext
488: .get(MdbCallContext.class);
489: if (mdbCallContext != null) {
490: try {
491: mdbCallContext.txPolicy.afterInvoke(instance,
492: mdbCallContext.txContext);
493: } catch (Exception e) {
494: logger.error("error while releasing message endpoint",
495: e);
496: } finally {
497: EndpointFactory endpointFactory = (EndpointFactory) deployInfo
498: .getContainerData();
499: endpointFactory.getInstanceFactory().freeInstance(
500: (Instance) instance, false);
501: ThreadContext.exit(mdbCallContext.oldCallContext);
502: }
503: }
504:
505: }
506:
507: private static class MdbCallContext {
508: private Method deliveryMethod;
509: private TransactionPolicy txPolicy;
510: private TransactionContext txContext;
511: private ThreadContext oldCallContext;
512: }
513:
514: public void discardInstance(Object instance, ThreadContext context) {
515: }
516: }
|