001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2006 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: ProviderProcessor.java 1206 2006-09-23 03:51:32Z elu $
023: */
024: package com.bostechcorp.cbesb.runtime.component.jms.processors;
025:
026: import java.io.ByteArrayInputStream;
027: import java.io.ByteArrayOutputStream;
028: import java.io.IOException;
029: import java.io.StringReader;
030: import java.util.Vector;
031:
032: import javax.jbi.messaging.MessageExchange;
033: import javax.jbi.messaging.NormalizedMessage;
034: import javax.jms.BytesMessage;
035: import javax.jms.JMSException;
036: import javax.jms.Message;
037: import javax.jms.TextMessage;
038: import javax.xml.namespace.QName;
039: import javax.xml.transform.Source;
040: import javax.xml.transform.Transformer;
041: import javax.xml.transform.TransformerConfigurationException;
042: import javax.xml.transform.TransformerException;
043: import javax.xml.transform.TransformerFactory;
044: import javax.xml.transform.dom.DOMResult;
045: import javax.xml.transform.dom.DOMSource;
046:
047: import org.apache.commons.logging.Log;
048: import org.apache.commons.logging.LogFactory;
049: import org.w3c.dom.Node;
050:
051: import com.bostechcorp.cbesb.common.runtime.ConfigurationException;
052: import com.bostechcorp.cbesb.common.runtime.DataContentException;
053: import com.bostechcorp.cbesb.common.runtime.ResourcesConnectionException;
054: import com.bostechcorp.cbesb.common.util.Dom;
055: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.CbProviderProcessor;
056: import com.bostechcorp.cbesb.runtime.ccsl.lib.ExternalInput;
057: import com.bostechcorp.cbesb.runtime.ccsl.lib.NormalizedMessageUtil;
058: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.NormalizedMessageHandler;
059: import com.bostechcorp.cbesb.runtime.component.jms.JMSEndpoint;
060: import com.bostechcorp.cbesb.runtime.component.jms.JMSMarshaler;
061: import com.bostechcorp.cbesb.runtime.jms.JMSConsumerHandler;
062: import com.bostechcorp.cbesb.runtime.jms.JMSHandler;
063: import com.bostechcorp.cbesb.runtime.jms.JMSProducerHandler;
064:
065: public class ProviderProcessor extends CbProviderProcessor {
066:
067: protected final transient Log logger = LogFactory
068: .getLog(getClass());
069: protected static final String QUERY_REQUEST = "QueryRequest";
070: protected static final String QUERY_RESPONSE = "QueryResponse";
071: protected static final String QUERY_NAMESPACE = "http://cbesb.bostechcorp.com/jms/1.0";
072: protected static final String EXPRESSION = "expression";
073: protected static final String SUCCESS = "success";
074: protected static final String MESSAGE = "message";
075:
076: private JMSEndpoint endpoint;
077:
078: private JMSHandler jmsHandler;
079:
080: public ProviderProcessor(JMSEndpoint endpoint) {
081: super (endpoint);
082: this .endpoint = endpoint;
083: }
084:
085: private void initProducerHandler() throws ConfigurationException,
086: ResourcesConnectionException, JMSException {
087: logger.debug("JMS init handler");
088: String initialContextFactory = null;
089: String providerUrl = null;
090: String connectionFactoryName = null;
091: String userName = null;
092: String password = null;
093:
094: if (endpoint.getJndiInitialContextFactory() != null)
095: initialContextFactory = endpoint
096: .getJndiInitialContextFactory();
097: if (endpoint.getJndiProviderUrl() != null)
098: providerUrl = endpoint.getJndiProviderUrl();
099:
100: if (endpoint.getJndiConnectionFactoryName() != null)
101: connectionFactoryName = endpoint
102: .getJndiConnectionFactoryName();
103:
104: jmsHandler = new JMSProducerHandler(initialContextFactory,
105: providerUrl, connectionFactoryName, userName, password,
106: endpoint.getDestinationStyle(), endpoint
107: .getTargetDestinationName(), endpoint
108: .getReplyDestinationName());
109:
110: }
111:
112: private void initConsumerHandler() throws ConfigurationException,
113: ResourcesConnectionException, JMSException {
114: logger.debug("JMS init handler");
115: String initialContextFactory = null;
116: String providerUrl = null;
117: String connectionFactoryName = null;
118: String userName = null;
119: String password = null;
120:
121: if (endpoint.getJndiInitialContextFactory() != null)
122: initialContextFactory = endpoint
123: .getJndiInitialContextFactory();
124: if (endpoint.getJndiProviderUrl() != null)
125: providerUrl = endpoint.getJndiProviderUrl();
126:
127: if (endpoint.getJndiConnectionFactoryName() != null)
128: connectionFactoryName = endpoint
129: .getJndiConnectionFactoryName();
130:
131: jmsHandler = new JMSConsumerHandler(initialContextFactory,
132: providerUrl, connectionFactoryName, userName, password,
133: endpoint.getDestinationStyle(), endpoint
134: .getTargetDestinationName(), endpoint
135: .getReplyDestinationName(), false, null);
136:
137: }
138:
139: @Override
140: public void processInMessage(QName service, QName operation,
141: NormalizedMessage in, MessageExchange exchange)
142: throws ConfigurationException,
143: ResourcesConnectionException, JMSException,
144: TransformerConfigurationException, IOException,
145: TransformerException {
146:
147: // synchronized (jmsHandler){
148: if (jmsHandler == null)
149: initProducerHandler();
150: // }
151: logger.info("JMS provider processInMessage");
152:
153: // Convert to JMS message
154: // System.out.println("writestyle="+endpoint.getWriteStyle());
155: // System.out.println("charset="+endpoint.getCharset());
156:
157: Message sendMessage = JMSMarshaler.normalizedToJMS(in,
158: exchange, endpoint.getWriteStyle(), endpoint
159: .getCharset(), jmsHandler.getSession(),
160: endpoint.isSavaMetadata());
161: // JMSMarshaler.addMetatataToJms(sendMessage,exchange);
162: // Send in-only message
163: jmsHandler.send(null, 0, sendMessage);
164: }
165:
166: /*
167: * (non-Javadoc)
168: *
169: * @see com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.BaseProviderProcessor#processInOutMessage(javax.xml.namespace.QName,
170: * javax.xml.namespace.QName, javax.jbi.messaging.NormalizedMessage,
171: * javax.jbi.messaging.NormalizedMessage, boolean)
172: */
173: public boolean processInOutMessage(QName service, QName operation,
174: NormalizedMessage in, NormalizedMessage out,
175: boolean optionalOut, MessageExchange exchange)
176: throws Exception {
177:
178: logger.info("JMS provider processInOutMessage");
179:
180: if (jmsHandler == null)
181: if (endpoint.isEnableQuery())
182: initConsumerHandler();
183: else
184: initProducerHandler();
185:
186: logger.info("Enable query:" + endpoint.isEnableQuery());
187: if (endpoint.isEnableQuery()) {
188:
189: Vector<String> filterList = createSelector(in);
190: for (String filter : filterList) {
191: Message pooledMessage = jmsHandler.receiveFiltered(
192: filter, false, false);
193: if (pooledMessage != null) {
194: ExternalInput input = jmsMessageToExternalInput(pooledMessage);
195: // JMSMarshaler.addMetatataToNMExchange(exchange,
196: // pooledMessage);
197: input.populateMessage(out,
198: getProviderSvcDescHandlerInstance());
199:
200: }
201:
202: else {
203: StringBuffer buffer = new StringBuffer("");
204: appendQueryResponseOpeningTag(buffer);
205: appendElement(buffer);
206: appendQueryResponseClosingTag(buffer);
207: // setOutMessageContent(out, buffer);
208: NormalizedMessageUtil.addXmlRecord(out, buffer
209: .toString());
210:
211: }
212: }
213:
214: } else // If EnableQuery is set to False, the current Provider logic remains
215: {
216:
217: // Convert to JMS message
218: Message sendMessage = JMSMarshaler.normalizedToJMS(in,
219: exchange, endpoint.getWriteStyle(), endpoint
220: .getCharset(), jmsHandler.getSession(),
221: endpoint.isSavaMetadata());
222: // JMSMarshaler.addMetatataToJms(sendMessage,exchange);
223: // Send and get reply
224: long replyTimeout = 60000;
225: if (endpoint.getReplyTimeout() != null) {
226: long wsdlTimeout = (new Long(endpoint.getReplyTimeout()))
227: .longValue();
228: if (wsdlTimeout > 0)
229: replyTimeout = wsdlTimeout;
230: }
231: logger.debug("reply timeout:" + replyTimeout);
232: Message reply = jmsHandler.send(endpoint
233: .getReplyDestinationName(), replyTimeout,
234: sendMessage);
235:
236: if (reply != null) {
237: logger.debug("create out message");
238:
239: ExternalInput input = jmsMessageToExternalInput(reply);
240: // JMSMarshaler.addMetatataToNMExchange(exchange, reply);
241: input.populateMessage(out,
242: getProviderSvcDescHandlerInstance());
243: }
244: }
245: return true;
246: }
247:
248: private ExternalInput jmsMessageToExternalInput(Message jmsMessage)
249: throws JMSException, Exception {
250: ExternalInput input = null;
251: if (jmsMessage != null) {
252: if (jmsMessage instanceof TextMessage) {
253: TextMessage tm = (TextMessage) jmsMessage;
254: input = new ExternalInput(
255: new StringReader(tm.getText()), endpoint
256: .getCharset(), endpoint.getReadStyle(),
257: endpoint.getRecordType(), 0);
258: } else if (jmsMessage instanceof BytesMessage) {
259: BytesMessage bm = (BytesMessage) jmsMessage;
260: ByteArrayOutputStream baos = new ByteArrayOutputStream();
261: byte[] buf = new byte[2048];
262: for (int l = 0; (l = bm.readBytes(buf)) > 0;)
263: baos.write(buf, 0, l);
264: input = new ExternalInput(new ByteArrayInputStream(baos
265: .toByteArray()), endpoint.getCharset(),
266: endpoint.getReadStyle(), endpoint
267: .getRecordType(), 0);
268: } else {
269: throw new Exception("unsupported JMS message type: "
270: + jmsMessage.getClass().getName());
271: }
272: }
273: return input;
274: }
275:
276: private Vector<String> createSelector(NormalizedMessage in)
277: throws DataContentException {
278:
279: NormalizedMessageHandler nmh = new NormalizedMessageHandler(in);
280: Vector<String> result = new Vector<String>();
281: String queryExpression = endpoint.getQueryExpression().trim();
282:
283: for (int i = 0; i < nmh.getRecordCount(); i++) {
284: Source src = nmh.getRecordAtIndex(i);
285:
286: DOMSource dom = null;
287: if (src instanceof DOMSource) {
288: //logger.debug("DOMSource");
289: dom = (DOMSource) src;
290: } else {
291:
292: logger.debug("it is not DOMSource,need to transform");
293:
294: DOMResult dr = new DOMResult();
295: TransformerFactory tf = TransformerFactory
296: .newInstance();
297: try {
298: Transformer t = tf.newTransformer();
299: t.transform(src, dr);
300: } catch (TransformerException e) {
301: throw new DataContentException(
302: "Failed to convert the source in Normalized Message into DOMSource. - "
303: + e.getMessage(),
304: "Verify message content is well-formed XML.",
305: e);
306: }
307: dom = new DOMSource(dr.getNode());
308: }
309: String filter = "";
310:
311: //logger.debug(dom.getNode().getFirstChild().getNodeName());
312:
313: if (QUERY_REQUEST.equals(dom.getNode().getFirstChild()
314: .getNodeName())
315: && QUERY_NAMESPACE.equals(dom.getNode()
316: .getFirstChild().getNamespaceURI())) {
317:
318: Node request = dom.getNode().getFirstChild();
319: Node expression = Dom.findChild(request, EXPRESSION,
320: false);
321:
322: if (expression != null) {
323: filter = Dom.getTextContent(expression);
324: if (filter == null)
325: filter = "";
326: }
327: logger.info("filter=" + filter);
328:
329: //apend QueryExpression defined in endpoint
330: if (queryExpression != null
331: && queryExpression.length() > 0) {
332: filter += " and " + queryExpression;
333: }
334:
335: result.add(filter);
336: }
337: }
338: return result;
339: }
340:
341: private void appendQueryResponseOpeningTag(StringBuffer buffer) {
342: buffer.append("<" + QUERY_RESPONSE + " xmlns=\""
343: + QUERY_NAMESPACE + "\">");
344: }
345:
346: private void appendElement(StringBuffer buffer) {
347: buffer.append("<" + SUCCESS + ">");
348: buffer.append("true");
349: buffer.append("</" + SUCCESS + ">");
350: buffer.append("<" + MESSAGE + ">");
351: buffer.append("No JMS message received.");
352: buffer.append("</" + MESSAGE + ">");
353:
354: buffer.append("\r\n");
355: }
356:
357: private void appendQueryResponseClosingTag(StringBuffer buffer) {
358: buffer.append("</" + QUERY_RESPONSE + ">");
359: }
360:
361: }
|