001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.bean;
018:
019: import java.lang.reflect.Field;
020: import java.lang.reflect.Method;
021: import java.util.Map;
022: import java.util.MissingResourceException;
023: import java.util.concurrent.ConcurrentHashMap;
024: import java.util.concurrent.Future;
025: import java.util.logging.Logger;
026:
027: import javax.annotation.PostConstruct;
028: import javax.annotation.PreDestroy;
029: import javax.annotation.Resource;
030: import javax.jbi.JBIException;
031: import javax.jbi.component.ComponentContext;
032: import javax.jbi.management.MBeanNames;
033: import javax.jbi.messaging.DeliveryChannel;
034: import javax.jbi.messaging.ExchangeStatus;
035: import javax.jbi.messaging.InOut;
036: import javax.jbi.messaging.MessageExchange;
037: import javax.jbi.messaging.MessageExchange.Role;
038: import javax.jbi.messaging.MessageExchangeFactory;
039: import javax.jbi.messaging.MessagingException;
040: import javax.jbi.messaging.NormalizedMessage;
041: import javax.jbi.servicedesc.ServiceEndpoint;
042: import javax.management.MBeanServer;
043: import javax.naming.InitialContext;
044: import javax.xml.namespace.QName;
045:
046: import org.w3c.dom.Document;
047: import org.w3c.dom.DocumentFragment;
048:
049: import org.aopalliance.intercept.MethodInvocation;
050: import org.apache.commons.jexl.Expression;
051: import org.apache.commons.jexl.ExpressionFactory;
052: import org.apache.commons.jexl.JexlContext;
053: import org.apache.commons.jexl.JexlHelper;
054: import org.apache.servicemix.MessageExchangeListener;
055: import org.apache.servicemix.bean.support.BeanInfo;
056: import org.apache.servicemix.bean.support.DefaultMethodInvocationStrategy;
057: import org.apache.servicemix.bean.support.DestinationImpl;
058: import org.apache.servicemix.bean.support.Holder;
059: import org.apache.servicemix.bean.support.MethodInvocationStrategy;
060: import org.apache.servicemix.bean.support.ReflectionUtils;
061: import org.apache.servicemix.bean.support.Request;
062: import org.apache.servicemix.common.EndpointComponentContext;
063: import org.apache.servicemix.common.endpoints.ProviderEndpoint;
064: import org.apache.servicemix.expression.JAXPStringXPathExpression;
065: import org.apache.servicemix.expression.PropertyExpression;
066: import org.apache.servicemix.jbi.resolver.URIResolver;
067: import org.apache.servicemix.jbi.util.MessageUtil;
068: import org.springframework.beans.BeansException;
069: import org.springframework.context.ApplicationContext;
070: import org.springframework.context.ApplicationContextAware;
071:
072: /**
073: * Represents a bean endpoint which consists of a together with a {@link MethodInvocationStrategy}
074: * so that JBI message exchanges can be invoked on the bean.
075: *
076: * @version $Revision: $
077: * @org.apache.xbean.XBean element="endpoint"
078: */
079: public class BeanEndpoint extends ProviderEndpoint implements
080: ApplicationContextAware {
081:
082: private ApplicationContext applicationContext;
083: private String beanName;
084: private Object bean;
085: private BeanInfo beanInfo;
086: private Class<?> beanType;
087: private String beanClassName;
088: private MethodInvocationStrategy methodInvocationStrategy;
089: private org.apache.servicemix.expression.Expression correlationExpression;
090:
091: private Map<String, Holder> exchanges = new ConcurrentHashMap<String, Holder>();
092: private Map<Object, Request> requests = new ConcurrentHashMap<Object, Request>();
093: private ThreadLocal<Request> currentRequest = new ThreadLocal<Request>();
094: private ComponentContext context;
095: private DeliveryChannel channel;
096:
097: public BeanEndpoint() {
098: }
099:
100: public BeanEndpoint(BeanComponent component,
101: ServiceEndpoint serviceEndpoint) {
102: super (component, serviceEndpoint);
103: this .applicationContext = component.getApplicationContext();
104: }
105:
106: public void start() throws Exception {
107: super .start();
108: context = new EndpointComponentContext(this );
109: channel = context.getDeliveryChannel();
110: Object pojo = getBean();
111: if (pojo != null) {
112: injectBean(pojo);
113: ReflectionUtils.callLifecycleMethod(pojo,
114: PostConstruct.class);
115: }
116: beanType = pojo != null ? pojo.getClass() : createBean()
117: .getClass();
118: if (getMethodInvocationStrategy() == null) {
119: throw new IllegalArgumentException(
120: "No 'methodInvocationStrategy' property set");
121: }
122: }
123:
124: public void stop() throws Exception {
125: super .stop();
126: Object pojo = getBean();
127: if (pojo != null) {
128: ReflectionUtils.callLifecycleMethod(pojo, PreDestroy.class);
129: }
130: }
131:
132: public ApplicationContext getApplicationContext() {
133: return applicationContext;
134: }
135:
136: public void setApplicationContext(
137: ApplicationContext applicationContext)
138: throws BeansException {
139: this .applicationContext = applicationContext;
140: }
141:
142: public String getBeanName() {
143: return beanName;
144: }
145:
146: public void setBeanName(String beanName) {
147: this .beanName = beanName;
148: }
149:
150: public Object getBean() {
151: return bean;
152: }
153:
154: public void setBean(Object bean) {
155: this .bean = bean;
156: }
157:
158: /**
159: * @return the beanType
160: */
161: public Class<?> getBeanType() {
162: return beanType;
163: }
164:
165: /**
166: * @param beanType the beanType to set
167: */
168: public void setBeanType(Class<?> beanType) {
169: this .beanType = beanType;
170: }
171:
172: /**
173: * @return the beanClassName
174: */
175: public String getBeanClassName() {
176: return beanClassName;
177: }
178:
179: /**
180: * @param beanClassName the beanClassName to set
181: */
182: public void setBeanClassName(String beanClassName) {
183: this .beanClassName = beanClassName;
184: }
185:
186: public BeanInfo getBeanInfo() {
187: if (beanInfo == null) {
188: beanInfo = new BeanInfo(beanType,
189: getMethodInvocationStrategy());
190: beanInfo.introspect();
191: }
192: return beanInfo;
193: }
194:
195: public void setBeanInfo(BeanInfo beanInfo) {
196: this .beanInfo = beanInfo;
197: }
198:
199: public MethodInvocationStrategy getMethodInvocationStrategy() {
200: if (methodInvocationStrategy == null) {
201: methodInvocationStrategy = createMethodInvocationStrategy();
202: }
203: return methodInvocationStrategy;
204: }
205:
206: public void setMethodInvocationStrategy(
207: MethodInvocationStrategy methodInvocationStrategy) {
208: this .methodInvocationStrategy = methodInvocationStrategy;
209: }
210:
211: @Override
212: public void process(MessageExchange exchange) throws Exception {
213: if (exchange.getRole() == Role.CONSUMER) {
214: onConsumerExchange(exchange);
215: // Find or create the request for this provider exchange
216: } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
217: onProviderExchange(exchange);
218: } else {
219: throw new IllegalStateException("Unknown role: "
220: + exchange.getRole());
221: }
222: }
223:
224: protected void onProviderExchange(MessageExchange exchange)
225: throws Exception {
226: Object corId = getCorrelation(exchange);
227: Request req = requests.get(corId);
228: if (req == null) {
229: Object pojo = getBean();
230: if (pojo == null) {
231: pojo = createBean();
232: injectBean(pojo);
233: ReflectionUtils.callLifecycleMethod(pojo,
234: PostConstruct.class);
235: }
236: req = new Request(pojo, exchange);
237: requests.put(corId, req);
238: }
239: currentRequest.set(req);
240: synchronized (req) {
241: // If the bean implements MessageExchangeListener,
242: // just call the method
243: if (req.getBean() instanceof MessageExchangeListener) {
244: ((MessageExchangeListener) req.getBean())
245: .onMessageExchange(exchange);
246: } else {
247: // Exchange is finished
248: if (exchange.getStatus() == ExchangeStatus.DONE) {
249: return;
250: // Exchange has been aborted with an exception
251: } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
252: return;
253: // Fault message
254: } else if (exchange.getFault() != null) {
255: // TODO: find a way to send it back to the bean before setting the DONE status
256: done(exchange);
257: } else {
258: MethodInvocation invocation = getMethodInvocationStrategy()
259: .createInvocation(req.getBean(),
260: getBeanInfo(), exchange, this );
261: if (invocation == null) {
262: throw new UnknownMessageExchangeTypeException(
263: exchange, this );
264: }
265: try {
266: invocation.proceed();
267: } catch (Exception e) {
268: throw e;
269: } catch (Throwable throwable) {
270: throw new MethodInvocationFailedException(req
271: .getBean(), invocation, exchange, this ,
272: throwable);
273: }
274: if (exchange.getStatus() == ExchangeStatus.ERROR) {
275: send(exchange);
276: }
277: if (exchange.getFault() == null
278: && exchange.getMessage("out") == null) {
279: // TODO: handle MEP correctly (DONE should only be sent for InOnly)
280: done(exchange);
281: }
282: }
283: }
284: checkEndOfRequest(req, corId);
285: currentRequest.set(null);
286: }
287: }
288:
289: protected void onConsumerExchange(MessageExchange exchange)
290: throws Exception {
291: Object corId = exchange.getExchangeId();
292: Request req = requests.remove(corId);
293: if (req == null) {
294: throw new IllegalStateException(
295: "Receiving unknown consumer exchange: " + exchange);
296: }
297: currentRequest.set(req);
298: // If the bean implements MessageExchangeListener,
299: // just call the method
300: if (req.getBean() instanceof MessageExchangeListener) {
301: ((MessageExchangeListener) req.getBean())
302: .onMessageExchange(exchange);
303: } else {
304: Holder me = exchanges.get(exchange.getExchangeId());
305: if (me == null) {
306: throw new IllegalStateException(
307: "Consumer exchange not found");
308: }
309: me.set(exchange);
310: evaluateCallbacks(req);
311: }
312: checkEndOfRequest(req, corId);
313: currentRequest.set(null);
314: }
315:
316: protected Object getCorrelation(MessageExchange exchange)
317: throws MessagingException {
318: return getCorrelationExpression().evaluate(exchange,
319: exchange.getMessage("in"));
320: }
321:
322: protected Object createBean() throws ClassNotFoundException,
323: InstantiationException, IllegalAccessException {
324: if (beanName == null && beanType == null) {
325: throw new IllegalArgumentException(
326: "Property 'beanName' has not been set!");
327: }
328: if (beanType == null && beanClassName != null) {
329: beanType = Class.forName(beanClassName, true,
330: getServiceUnit().getConfigurationClassLoader());
331: }
332: if (beanType != null) {
333: return beanType.newInstance();
334: } else if (beanName == null) {
335: throw new IllegalArgumentException(
336: "Property 'beanName', 'beanType' or 'beanClassName' must be set!");
337: } else if (applicationContext == null) {
338: throw new IllegalArgumentException(
339: "Property 'beanName' specified, but no BeanFactory set!");
340: } else {
341: Object answer = applicationContext.getBean(beanName);
342: if (answer == null) {
343: throw new NoSuchBeanException(beanName, this );
344: }
345: return answer;
346: }
347: }
348:
349: protected MethodInvocationStrategy createMethodInvocationStrategy() {
350: DefaultMethodInvocationStrategy st = new DefaultMethodInvocationStrategy();
351: st.loadDefaultRegistry();
352: return st;
353: }
354:
355: /**
356: * A strategy method to allow implementations to perform some custom JBI based injection of the POJO
357: *
358: * @param target the bean to be injected
359: */
360: protected void injectBean(final Object target) {
361: final PojoContext ctx = new PojoContext();
362: final DeliveryChannel ch = ctx.channel;
363: // Inject fields
364: ReflectionUtils.doWithFields(target.getClass(),
365: new ReflectionUtils.FieldCallback() {
366: public void doWith(Field f)
367: throws IllegalArgumentException,
368: IllegalAccessException {
369: ExchangeTarget et = f
370: .getAnnotation(ExchangeTarget.class);
371: if (et != null) {
372: ReflectionUtils.setField(f, target,
373: new DestinationImpl(et.uri(),
374: BeanEndpoint.this ));
375: }
376: if (f.getAnnotation(Resource.class) != null) {
377: if (ComponentContext.class
378: .isAssignableFrom(f.getType())) {
379: ReflectionUtils
380: .setField(f, target, ctx);
381: } else if (DeliveryChannel.class
382: .isAssignableFrom(f.getType())) {
383: ReflectionUtils.setField(f, target, ch);
384: }
385: }
386: }
387: });
388: }
389:
390: protected void evaluateCallbacks(final Request req) {
391: final Object obj = req.getBean();
392: ReflectionUtils.doWithMethods(obj.getClass(),
393: new ReflectionUtils.MethodCallback() {
394: @SuppressWarnings("unchecked")
395: public void doWith(Method method)
396: throws IllegalArgumentException,
397: IllegalAccessException {
398: if (method.getAnnotation(Callback.class) != null) {
399: try {
400: Expression e = ExpressionFactory
401: .createExpression(method
402: .getAnnotation(
403: Callback.class)
404: .condition());
405: JexlContext jc = JexlHelper
406: .createContext();
407: jc.getVars().put("this", obj);
408: Object r = e.evaluate(jc);
409: if (!(r instanceof Boolean)) {
410: throw new RuntimeException(
411: "Expression did not returned a boolean value but: "
412: + r);
413: }
414: Boolean oldVal = req.getCallbacks()
415: .get(method);
416: Boolean newVal = (Boolean) r;
417: if ((oldVal == null || !oldVal)
418: && newVal) {
419: req.getCallbacks().put(method,
420: newVal);
421: method.invoke(obj, new Object[0]);
422: // TODO: handle return value and sent it as the answer
423: }
424: } catch (Exception e) {
425: throw new RuntimeException(
426: "Unable to invoke callback", e);
427: }
428: }
429: }
430: });
431: }
432:
433: /**
434: * Used by POJOs acting as a consumer
435: * @param uri
436: * @param message
437: * @return
438: */
439: public Future<NormalizedMessage> send(String uri,
440: NormalizedMessage message) {
441: try {
442: InOut me = getExchangeFactory().createInOutExchange();
443: URIResolver.configureExchange(me, getServiceUnit()
444: .getComponent().getComponentContext(), uri);
445: MessageUtil.transferTo(message, me, "in");
446: final Holder h = new Holder();
447: requests.put(me.getExchangeId(), currentRequest.get());
448: exchanges.put(me.getExchangeId(), h);
449: BeanEndpoint.this .send(me);
450: return h;
451: } catch (Exception e) {
452: throw new RuntimeException(e);
453: }
454: }
455:
456: protected void checkEndOfRequest(Request request, Object corId) {
457: if (request.getExchange().getStatus() != ExchangeStatus.ACTIVE) {
458: ReflectionUtils.callLifecycleMethod(request.getBean(),
459: PreDestroy.class);
460: //request.setBean(null);
461: //request.setExchange(null);
462: requests.remove(corId);
463: }
464: }
465:
466: /**
467: * @return the correlationExpression
468: */
469: public org.apache.servicemix.expression.Expression getCorrelationExpression() {
470: if (correlationExpression == null) {
471: // Find correlation expression
472: Correlation cor = beanType.getAnnotation(Correlation.class);
473: if (cor != null) {
474: if (cor.property() != null) {
475: correlationExpression = new PropertyExpression(cor
476: .property());
477: } else if (cor.xpath() != null) {
478: correlationExpression = new JAXPStringXPathExpression(
479: cor.xpath());
480: }
481: }
482: if (correlationExpression == null) {
483: correlationExpression = new org.apache.servicemix.expression.Expression() {
484: public Object evaluate(MessageExchange exchange,
485: NormalizedMessage message)
486: throws MessagingException {
487: return exchange.getExchangeId();
488: }
489: };
490: }
491: }
492: return correlationExpression;
493: }
494:
495: /**
496: * @param correlationExpression the correlationExpression to set
497: */
498: public void setCorrelationExpression(
499: org.apache.servicemix.expression.Expression correlationExpression) {
500: this .correlationExpression = correlationExpression;
501: }
502:
503: protected class PojoContext implements ComponentContext {
504:
505: private DeliveryChannel channel = new PojoChannel();
506:
507: public ServiceEndpoint activateEndpoint(QName qName, String s)
508: throws JBIException {
509: return context.activateEndpoint(qName, s);
510: }
511:
512: public void deactivateEndpoint(ServiceEndpoint serviceEndpoint)
513: throws JBIException {
514: context.deactivateEndpoint(serviceEndpoint);
515: }
516:
517: public void registerExternalEndpoint(
518: ServiceEndpoint serviceEndpoint) throws JBIException {
519: context.registerExternalEndpoint(serviceEndpoint);
520: }
521:
522: public void deregisterExternalEndpoint(
523: ServiceEndpoint serviceEndpoint) throws JBIException {
524: context.deregisterExternalEndpoint(serviceEndpoint);
525: }
526:
527: public ServiceEndpoint resolveEndpointReference(
528: DocumentFragment documentFragment) {
529: return context.resolveEndpointReference(documentFragment);
530: }
531:
532: public String getComponentName() {
533: return context.getComponentName();
534: }
535:
536: public DeliveryChannel getDeliveryChannel()
537: throws MessagingException {
538: return channel;
539: }
540:
541: public ServiceEndpoint getEndpoint(QName qName, String s) {
542: return context.getEndpoint(qName, s);
543: }
544:
545: public Document getEndpointDescriptor(
546: ServiceEndpoint serviceEndpoint) throws JBIException {
547: return context.getEndpointDescriptor(serviceEndpoint);
548: }
549:
550: public ServiceEndpoint[] getEndpoints(QName qName) {
551: return context.getEndpoints(qName);
552: }
553:
554: public ServiceEndpoint[] getEndpointsForService(QName qName) {
555: return context.getEndpointsForService(qName);
556: }
557:
558: public ServiceEndpoint[] getExternalEndpoints(QName qName) {
559: return context.getExternalEndpoints(qName);
560: }
561:
562: public ServiceEndpoint[] getExternalEndpointsForService(
563: QName qName) {
564: return context.getExternalEndpointsForService(qName);
565: }
566:
567: public String getInstallRoot() {
568: return context.getInstallRoot();
569: }
570:
571: public Logger getLogger(String s, String s1)
572: throws MissingResourceException, JBIException {
573: return context.getLogger(s, s1);
574: }
575:
576: public MBeanNames getMBeanNames() {
577: return context.getMBeanNames();
578: }
579:
580: public MBeanServer getMBeanServer() {
581: return context.getMBeanServer();
582: }
583:
584: public InitialContext getNamingContext() {
585: return context.getNamingContext();
586: }
587:
588: public Object getTransactionManager() {
589: return context.getTransactionManager();
590: }
591:
592: public String getWorkspaceRoot() {
593: return context.getWorkspaceRoot();
594: }
595: }
596:
597: protected class PojoChannel implements DeliveryChannel {
598:
599: public void close() throws MessagingException {
600: BeanEndpoint.this .channel.close();
601: }
602:
603: public MessageExchangeFactory createExchangeFactory() {
604: return BeanEndpoint.this .channel.createExchangeFactory();
605: }
606:
607: public MessageExchangeFactory createExchangeFactory(QName qName) {
608: return BeanEndpoint.this .channel
609: .createExchangeFactory(qName);
610: }
611:
612: public MessageExchangeFactory createExchangeFactoryForService(
613: QName qName) {
614: return BeanEndpoint.this .channel
615: .createExchangeFactoryForService(qName);
616: }
617:
618: public MessageExchangeFactory createExchangeFactory(
619: ServiceEndpoint serviceEndpoint) {
620: return BeanEndpoint.this .channel
621: .createExchangeFactory(serviceEndpoint);
622: }
623:
624: public MessageExchange accept() throws MessagingException {
625: return BeanEndpoint.this .channel.accept();
626: }
627:
628: public MessageExchange accept(long l) throws MessagingException {
629: return BeanEndpoint.this .channel.accept(l);
630: }
631:
632: public void send(MessageExchange messageExchange)
633: throws MessagingException {
634: if (messageExchange.getRole() == MessageExchange.Role.CONSUMER
635: && messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
636: requests.put(messageExchange.getExchangeId(),
637: currentRequest.get());
638: }
639: BeanEndpoint.this .channel.send(messageExchange);
640: }
641:
642: public boolean sendSync(MessageExchange messageExchange)
643: throws MessagingException {
644: return BeanEndpoint.this .channel.sendSync(messageExchange);
645: }
646:
647: public boolean sendSync(MessageExchange messageExchange, long l)
648: throws MessagingException {
649: return BeanEndpoint.this.channel.sendSync(messageExchange,
650: l);
651: }
652:
653: }
654: }
|