001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jms;
018:
019: import java.util.Map;
020: import java.util.Properties;
021:
022: import javax.jbi.component.ComponentContext;
023: import javax.jbi.messaging.DeliveryChannel;
024: import javax.jbi.messaging.ExchangeStatus;
025: import javax.jbi.messaging.Fault;
026: import javax.jbi.messaging.MessageExchange;
027: import javax.jbi.messaging.NormalizedMessage;
028: import javax.jms.Connection;
029: import javax.jms.ConnectionFactory;
030: import javax.jms.Message;
031: import javax.jms.Session;
032: import javax.naming.InitialContext;
033: import javax.naming.NamingException;
034:
035: import org.apache.commons.logging.Log;
036: import org.apache.commons.logging.LogFactory;
037: import org.apache.servicemix.JbiConstants;
038: import org.apache.servicemix.common.BaseLifeCycle;
039: import org.apache.servicemix.common.EndpointComponentContext;
040: import org.apache.servicemix.common.ExchangeProcessor;
041: import org.apache.servicemix.soap.Context;
042: import org.apache.servicemix.soap.SoapFault;
043: import org.apache.servicemix.soap.SoapHelper;
044: import org.apache.servicemix.soap.marshalers.SoapMessage;
045: import org.apache.servicemix.store.Store;
046: import org.apache.servicemix.store.memory.MemoryStoreFactory;
047:
048: public abstract class AbstractJmsProcessor implements ExchangeProcessor {
049:
050: public static final String STYLE_QUEUE = "queue";
051: public static final String STYLE_TOPIC = "topic";
052:
053: public static final String CONTENT_TYPE = "MimeContentType";
054:
055: protected final transient Log log = LogFactory.getLog(getClass());
056:
057: protected JmsEndpoint endpoint;
058: protected Connection connection;
059: protected SoapHelper soapHelper;
060: protected ComponentContext context;
061: protected DeliveryChannel channel;
062:
063: protected Store store;
064:
065: public AbstractJmsProcessor(JmsEndpoint endpoint) throws Exception {
066: this .endpoint = endpoint;
067: this .soapHelper = new SoapHelper(endpoint);
068: this .context = new EndpointComponentContext(endpoint);
069: this .channel = context.getDeliveryChannel();
070: }
071:
072: public void start() throws Exception {
073: try {
074: InitialContext ctx = getInitialContext();
075: ConnectionFactory connectionFactory = null;
076: connectionFactory = getConnectionFactory(ctx);
077: connection = connectionFactory.createConnection();
078: connection.start();
079:
080: // set up the Store
081: if (endpoint.store != null) {
082: store = endpoint.store;
083: } else if (endpoint.storeFactory != null) {
084: store = endpoint.storeFactory.open(endpoint
085: .getService().toString()
086: + endpoint.getEndpoint());
087: } else {
088: store = new MemoryStoreFactory().open(endpoint
089: .getService().toString()
090: + endpoint.getEndpoint());
091: }
092:
093: doStart(ctx);
094: } catch (Exception e) {
095: try {
096: stop();
097: } catch (Exception inner) {
098: // TODO: log
099: }
100: throw e;
101: }
102: }
103:
104: protected ConnectionFactory getConnectionFactory(InitialContext ctx)
105: throws NamingException {
106: // First check configured connectionFactory on the endpoint
107: ConnectionFactory connectionFactory = endpoint
108: .getConnectionFactory();
109: // Then, check for jndi connection factory name on the endpoint
110: if (connectionFactory == null
111: && endpoint.getJndiConnectionFactoryName() != null) {
112: connectionFactory = (ConnectionFactory) ctx.lookup(endpoint
113: .getJndiConnectionFactoryName());
114: }
115: // Check for a configured connectionFactory on the configuration
116: if (connectionFactory == null
117: && endpoint.getConfiguration().getConnectionFactory() != null) {
118: connectionFactory = endpoint.getConfiguration()
119: .getConnectionFactory();
120: }
121: // Check for jndi connection factory name on the configuration
122: if (connectionFactory == null
123: && endpoint.getConfiguration()
124: .getJndiConnectionFactoryName() != null) {
125: connectionFactory = (ConnectionFactory) ctx.lookup(endpoint
126: .getConfiguration().getJndiConnectionFactoryName());
127: }
128: return connectionFactory;
129: }
130:
131: protected InitialContext getInitialContext() throws NamingException {
132: Properties props = new Properties();
133: if (endpoint.getInitialContextFactory() != null
134: && endpoint.getJndiProviderURL() != null) {
135: props.put(InitialContext.INITIAL_CONTEXT_FACTORY, endpoint
136: .getInitialContextFactory());
137: props.put(InitialContext.PROVIDER_URL, endpoint
138: .getJndiProviderURL());
139: return new InitialContext(props);
140: } else if (endpoint.getConfiguration()
141: .getJndiInitialContextFactory() != null
142: && endpoint.getConfiguration().getJndiProviderUrl() != null) {
143: props.put(InitialContext.INITIAL_CONTEXT_FACTORY, endpoint
144: .getConfiguration().getJndiInitialContextFactory());
145: props.put(InitialContext.PROVIDER_URL, endpoint
146: .getConfiguration().getJndiProviderUrl());
147: return new InitialContext(props);
148: } else {
149: BaseLifeCycle lf = (BaseLifeCycle) endpoint
150: .getServiceUnit().getComponent().getLifeCycle();
151: return lf.getContext().getNamingContext();
152: }
153: }
154:
155: protected Store getStore() {
156: return store;
157: }
158:
159: protected void doStart(InitialContext ctx) throws Exception {
160: }
161:
162: public void stop() throws Exception {
163: try {
164: doStop();
165: if (connection != null) {
166: connection.close();
167: }
168: } finally {
169: connection = null;
170: }
171: }
172:
173: protected void doStop() throws Exception {
174: }
175:
176: protected Context createContext() {
177: return soapHelper.createContext();
178: }
179:
180: protected Message fromNMS(NormalizedMessage nm, Session session)
181: throws Exception {
182: SoapMessage soap = new SoapMessage();
183: soapHelper.getJBIMarshaler().fromNMS(soap, nm);
184: Map headers = (Map) nm
185: .getProperty(JbiConstants.PROTOCOL_HEADERS);
186: return endpoint.getMarshaler().toJMS(soap, headers, session);
187: }
188:
189: protected MessageExchange toNMS(Message message, Context ctx)
190: throws Exception {
191: SoapMessage soap = endpoint.getMarshaler().toSOAP(message);
192: ctx.setInMessage(soap);
193: ctx.setProperty(Message.class.getName(), message);
194: MessageExchange exchange = soapHelper.onReceive(ctx);
195: // TODO: copy protocol messages
196: //inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(message));
197: return exchange;
198: }
199:
200: protected Message fromNMSResponse(MessageExchange exchange,
201: Context ctx, Session session) throws Exception {
202: Message response = null;
203: if (exchange.getStatus() == ExchangeStatus.ERROR) {
204: // marshal error
205: Exception e = exchange.getError();
206: if (e == null) {
207: e = new Exception("Unkown error");
208: }
209: response = endpoint.getMarshaler().toJMS(e, session);
210: } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
211: // check for fault
212: Fault jbiFault = exchange.getFault();
213: if (jbiFault != null) {
214: // convert fault to SOAP message
215: SoapFault fault = new SoapFault(SoapFault.RECEIVER,
216: null, null, null, jbiFault.getContent());
217: SoapMessage soapFault = soapHelper.onFault(ctx, fault);
218: Map headers = (Map) jbiFault
219: .getProperty(JbiConstants.PROTOCOL_HEADERS);
220: response = endpoint.getMarshaler().toJMS(soapFault,
221: headers, session);
222: } else {
223: NormalizedMessage outMsg = exchange.getMessage("out");
224: if (outMsg != null) {
225: SoapMessage out = soapHelper.onReply(ctx, outMsg);
226: Map headers = (Map) outMsg
227: .getProperty(JbiConstants.PROTOCOL_HEADERS);
228: response = endpoint.getMarshaler().toJMS(out,
229: headers, session);
230: }
231: }
232: }
233: return response;
234: }
235:
236: }
|