001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.bpmscript.jbi;
018:
019: import java.lang.reflect.InvocationTargetException;
020: import java.lang.reflect.Method;
021: import java.util.HashMap;
022: import java.util.Map;
023:
024: import javax.jbi.JBIException;
025: import javax.jbi.component.ComponentContext;
026: import javax.jbi.management.DeploymentException;
027: import javax.jbi.messaging.DeliveryChannel;
028: import javax.jbi.messaging.ExchangeStatus;
029: import javax.jbi.messaging.InOptionalOut;
030: import javax.jbi.messaging.InOut;
031: import javax.jbi.messaging.MessageExchange;
032: import javax.jbi.messaging.MessageExchangeFactory;
033: import javax.jbi.messaging.MessagingException;
034: import javax.jbi.messaging.NormalizedMessage;
035: import javax.jbi.messaging.MessageExchange.Role;
036: import javax.jbi.servicedesc.ServiceEndpoint;
037:
038: import org.apache.servicemix.common.BaseLifeCycle;
039: import org.apache.servicemix.common.Endpoint;
040: import org.apache.servicemix.common.ExchangeProcessor;
041: import org.apache.servicemix.components.util.xstream.XStreamMarshaler;
042: import org.apache.servicemix.jbi.messaging.DefaultMarshaler;
043: import org.apache.servicemix.jbi.messaging.PojoMarshaler;
044: import org.bpmscript.jbi.component.EndpointDeliveryChannel;
045:
046: /**
047: * @org.apache.xbean.XBean element="endpoint"
048: */
049: public class PojoServiceEndpoint extends Endpoint implements
050: ExchangeProcessor {
051:
052: private ServiceEndpoint activated;
053: private DeliveryChannel channel;
054: private MessageExchangeFactory exchangeFactory;
055: private Object pojo;
056: private Map<String, Method> methodMap = new HashMap<String, Method>();
057: private PojoMarshaler marshaler = new DefaultMarshaler();
058:
059: /* (non-Javadoc)
060: * @see org.apache.servicemix.common.Endpoint#getRole()
061: */
062: public Role getRole() {
063: return Role.PROVIDER;
064: }
065:
066: public void activate() throws Exception {
067: logger = this .serviceUnit.getComponent().getLogger();
068: ComponentContext ctx = getServiceUnit().getComponent()
069: .getComponentContext();
070: channel = new EndpointDeliveryChannel(ctx.getDeliveryChannel(),
071: this );
072: exchangeFactory = channel.createExchangeFactory();
073: activated = ctx.activateEndpoint(service, endpoint);
074:
075: Method[] methods = pojo.getClass().getMethods();
076: for (Method method : methods) {
077: if (method.getDeclaringClass() != Object.class) {
078: Method existingMethod = methodMap.get(method.getName());
079: if (existingMethod != null) {
080: throw new JBIException(
081: "overloaded methods not supported. clashing methods are "
082: + existingMethod + " and " + method);
083: }
084: methodMap.put(method.getName(), method);
085: }
086: }
087: if (pojo instanceof IDeliveryChannelAware) {
088: IDeliveryChannelAware deliveryChannelAware = (IDeliveryChannelAware) pojo;
089: deliveryChannelAware.setDeliveryChannel(channel);
090: }
091: if (pojo instanceof IComponentAware) {
092: IComponentAware componentAware = (IComponentAware) pojo;
093: componentAware
094: .setComponent(getServiceUnit().getComponent());
095: }
096: start();
097: }
098:
099: public void deactivate() throws Exception {
100: stop();
101: ServiceEndpoint ep = activated;
102: activated = null;
103: ComponentContext ctx = getServiceUnit().getComponent()
104: .getComponentContext();
105: ctx.deactivateEndpoint(ep);
106: }
107:
108: public ExchangeProcessor getProcessor() {
109: return this ;
110: }
111:
112: public void validate() throws DeploymentException {
113: }
114:
115: protected void send(MessageExchange me) throws MessagingException {
116: if (me.getRole() == MessageExchange.Role.CONSUMER
117: && me.getStatus() == ExchangeStatus.ACTIVE) {
118: BaseLifeCycle lf = (BaseLifeCycle) getServiceUnit()
119: .getComponent().getLifeCycle();
120: lf.sendConsumerExchange(me, (Endpoint) this );
121: } else {
122: channel.send(me);
123: }
124: }
125:
126: protected void done(MessageExchange me) throws MessagingException {
127: me.setStatus(ExchangeStatus.DONE);
128: send(me);
129: }
130:
131: protected void fail(MessageExchange me, Exception error)
132: throws MessagingException {
133: me.setError(error);
134: send(me);
135: }
136:
137: public void start() throws Exception {
138: }
139:
140: public void stop() {
141: }
142:
143: public void process(MessageExchange exchange) throws Exception {
144: // TODO: make this friendlier
145: if (marshaler instanceof XStreamMarshaler) {
146: XStreamMarshaler xstreamMarshaler = (XStreamMarshaler) marshaler;
147: xstreamMarshaler.getXStream().setClassLoader(
148: this .getClass().getClassLoader());
149: }
150: if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
151: NormalizedMessage inMessage = exchange.getMessage("in");
152: String methodName = (String) inMessage
153: .getProperty("method");
154: if (methodName == null) {
155: fail(exchange, new NullPointerException(
156: "method property should not be null"));
157: return;
158: }
159: Method method = methodMap.get(methodName);
160: if (method == null) {
161: fail(exchange, new NoSuchMethodException(methodName));
162: return;
163: }
164: logger.info(methodName + " called");
165: try {
166: Object[] args = (Object[]) marshaler.unmarshal(
167: exchange, inMessage);
168: Object result = method.invoke(pojo, args);
169: if (exchange instanceof InOut) {
170: InOut inOut = (InOut) exchange;
171: NormalizedMessage outMessage = inOut
172: .createMessage();
173: marshaler.marshal(exchange, outMessage, result);
174: inOut.setOutMessage(outMessage);
175: send(exchange);
176: } else if (exchange instanceof InOptionalOut) {
177: InOptionalOut inOut = (InOptionalOut) exchange;
178: NormalizedMessage outMessage = inOut
179: .createMessage();
180: marshaler.marshal(exchange, outMessage, result);
181: inOut.setOutMessage(outMessage);
182: send(exchange);
183: } else {
184: done(exchange);
185: }
186: logger.info(methodName + " completed");
187: } catch (ClassCastException e) {
188: logger.error(e, e);
189: fail(exchange, e);
190: } catch (IllegalArgumentException e) {
191: logger.error(e, e);
192: fail(exchange, e);
193: } catch (IllegalAccessException e) {
194: logger.error(e, e);
195: fail(exchange, e);
196: } catch (InvocationTargetException e) {
197: logger.error(e, e);
198: fail(exchange, e);
199: }
200: }
201: }
202:
203: public void setMarshaler(PojoMarshaler marshaler) {
204: this .marshaler = marshaler;
205: }
206:
207: public void setPojo(Object pojo) {
208: this.pojo = pojo;
209: }
210:
211: }
|