001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.transport.jms;
019:
020: import java.io.ByteArrayInputStream;
021: import java.io.ByteArrayOutputStream;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.OutputStream;
025: import java.util.logging.Level;
026: import java.util.logging.Logger;
027:
028: import javax.jms.JMSException;
029: import javax.jms.Queue;
030: import javax.jms.QueueSender;
031: import javax.jms.TextMessage;
032: import javax.jms.Topic;
033: import javax.jms.TopicPublisher;
034: import javax.naming.NamingException;
035:
036: import org.apache.cxf.Bus;
037: import org.apache.cxf.common.logging.LogUtils;
038: import org.apache.cxf.configuration.Configurable;
039: import org.apache.cxf.configuration.Configurer;
040: import org.apache.cxf.io.CachedOutputStream;
041: import org.apache.cxf.message.Exchange;
042: import org.apache.cxf.message.Message;
043: import org.apache.cxf.message.MessageImpl;
044: import org.apache.cxf.service.model.EndpointInfo;
045: import org.apache.cxf.transport.AbstractConduit;
046: import org.apache.cxf.transport.Conduit;
047: import org.apache.cxf.transport.Destination;
048: import org.apache.cxf.transport.MessageObserver;
049: import org.apache.cxf.ws.addressing.EndpointReferenceType;
050:
051: public class JMSConduit extends AbstractConduit implements
052: Configurable, JMSTransport {
053:
054: protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base";
055:
056: private static final Logger LOG = LogUtils
057: .getL7dLogger(JMSConduit.class);
058:
059: protected final JMSTransportBase base;
060: protected ClientConfig clientConfig;
061: protected ClientBehaviorPolicyType runtimePolicy;
062: protected AddressType address;
063: protected SessionPoolType sessionPool;
064:
065: public JMSConduit(Bus b, EndpointInfo endpointInfo) {
066: this (b, endpointInfo, null);
067: }
068:
069: public JMSConduit(Bus b, EndpointInfo endpointInfo,
070: EndpointReferenceType target) {
071: super (target);
072:
073: base = new JMSTransportBase(b, endpointInfo, false,
074: BASE_BEAN_NAME_SUFFIX, this );
075:
076: initConfig();
077: }
078:
079: // prepare the message for send out , not actually send out the message
080: public void prepare(Message message) throws IOException {
081: getLogger().log(Level.FINE, "JMSConduit send message");
082:
083: try {
084: if (null == base.sessionFactory) {
085: JMSProviderHub.connect(this );
086: }
087: } catch (JMSException jmsex) {
088: getLogger().log(Level.WARNING,
089: "JMS connect failed with JMSException : ", jmsex);
090: throw new IOException(jmsex.toString());
091: } catch (NamingException ne) {
092: getLogger().log(Level.WARNING,
093: "JMS connect failed with NamingException : ", ne);
094: throw new IOException(ne.toString());
095: }
096:
097: if (base.sessionFactory == null) {
098: throw new java.lang.IllegalStateException(
099: "JMSClientTransport not connected");
100: }
101:
102: try {
103: boolean isOneWay = false;
104: //test if the message is oneway message
105: Exchange ex = message.getExchange();
106: if (null != ex) {
107: isOneWay = ex.isOneWay();
108: }
109: //get the pooledSession with response expected
110: PooledSession pooledSession = base.sessionFactory
111: .get(!isOneWay);
112: // put the PooledSession into the outMessage
113: message.put(JMSConstants.JMS_POOLEDSESSION, pooledSession);
114:
115: } catch (JMSException jmsex) {
116: throw new IOException(jmsex.getMessage());
117: }
118:
119: message.setContent(OutputStream.class, new JMSOutputStream(
120: message));
121:
122: }
123:
124: public void close() {
125: getLogger().log(Level.FINE, "JMSConduit closed ");
126:
127: // ensure resources held by session factory are released
128: //
129: if (base.sessionFactory != null) {
130: base.sessionFactory.shutdown();
131: }
132: }
133:
134: protected Logger getLogger() {
135: return LOG;
136: }
137:
138: /**
139: * Receive mechanics.
140: *
141: * @param pooledSession the shared JMS resources
142: * @retrun the response buffer
143: */
144: private Object receive(PooledSession pooledSession,
145: Message outMessage) throws JMSException {
146:
147: Object result = null;
148:
149: long timeout = getClientConfig().getClientReceiveTimeout();
150:
151: Long receiveTimeout = (Long) outMessage
152: .get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
153:
154: if (receiveTimeout != null) {
155: timeout = receiveTimeout.longValue();
156: }
157:
158: javax.jms.Message jmsMessage = pooledSession.consumer()
159: .receive(timeout);
160: getLogger().log(Level.FINE, "client received reply: ",
161: jmsMessage);
162:
163: if (jmsMessage != null) {
164:
165: base.populateIncomingContext(jmsMessage, outMessage,
166: JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
167: String messageType = jmsMessage instanceof TextMessage ? JMSConstants.TEXT_MESSAGE_TYPE
168: : JMSConstants.BINARY_MESSAGE_TYPE;
169: result = base.unmarshal(jmsMessage, messageType);
170: return result;
171: } else {
172: String error = "JMSClientTransport.receive() timed out. No message available.";
173: getLogger().log(Level.SEVERE, error);
174: //TODO: Review what exception should we throw.
175: throw new JMSException(error);
176:
177: }
178: }
179:
180: public void connected(javax.jms.Destination target,
181: javax.jms.Destination reply, JMSSessionFactory factory) {
182: base.connected(target, reply, factory);
183: }
184:
185: public String getBeanName() {
186: return base.endpointInfo.getName().toString() + ".jms-conduit";
187: }
188:
189: private void initConfig() {
190:
191: this .address = base.endpointInfo.getTraversedExtensor(
192: new AddressType(), AddressType.class);
193: this .sessionPool = base.endpointInfo.getTraversedExtensor(
194: new SessionPoolType(), SessionPoolType.class);
195: this .clientConfig = base.endpointInfo.getTraversedExtensor(
196: new ClientConfig(), ClientConfig.class);
197: this .runtimePolicy = base.endpointInfo.getTraversedExtensor(
198: new ClientBehaviorPolicyType(),
199: ClientBehaviorPolicyType.class);
200:
201: Configurer configurer = base.bus.getExtension(Configurer.class);
202: if (null != configurer) {
203: configurer.configureBean(this );
204: }
205: }
206:
207: private boolean isTextPayload() {
208: return JMSConstants.TEXT_MESSAGE_TYPE.equals(getRuntimePolicy()
209: .getMessageType().value());
210: }
211:
212: public AddressType getJMSAddress() {
213: return address;
214: }
215:
216: public void setJMSAddress(AddressType a) {
217: this .address = a;
218: }
219:
220: public ClientConfig getClientConfig() {
221: return clientConfig;
222: }
223:
224: public void setClientConfig(ClientConfig clientConfig) {
225: this .clientConfig = clientConfig;
226: }
227:
228: public ClientBehaviorPolicyType getRuntimePolicy() {
229: return runtimePolicy;
230: }
231:
232: public void setRuntimePolicy(ClientBehaviorPolicyType runtimePolicy) {
233: this .runtimePolicy = runtimePolicy;
234: }
235:
236: public SessionPoolType getSessionPool() {
237: return sessionPool;
238: }
239:
240: public void setSessionPool(SessionPoolType sessionPool) {
241: this .sessionPool = sessionPool;
242: }
243:
244: private class JMSOutputStream extends CachedOutputStream {
245: private Message outMessage;
246: private javax.jms.Message jmsMessage;
247: private PooledSession pooledSession;
248: private boolean isOneWay;
249:
250: public JMSOutputStream(Message m) {
251: outMessage = m;
252: pooledSession = (PooledSession) outMessage
253: .get(JMSConstants.JMS_POOLEDSESSION);
254: }
255:
256: protected void doFlush() throws IOException {
257: //do nothing here
258: }
259:
260: protected void doClose() throws IOException {
261: try {
262: isOneWay = outMessage.getExchange().isOneWay();
263: commitOutputMessage();
264: if (!isOneWay) {
265: handleResponse();
266: }
267: base.sessionFactory.recycle(pooledSession);
268: } catch (JMSException jmsex) {
269: getLogger().log(Level.WARNING,
270: "JMS connect failed with JMSException : ",
271: jmsex);
272: throw new IOException(jmsex.toString());
273: }
274: }
275:
276: protected void onWrite() throws IOException {
277:
278: }
279:
280: private void commitOutputMessage() throws JMSException {
281: Object request = null;
282:
283: if (isTextPayload()) {
284: request = currentStream.toString();
285: } else {
286: request = ((ByteArrayOutputStream) currentStream)
287: .toByteArray();
288: }
289:
290: getLogger().log(Level.FINE,
291: "Conduit Request is :[" + request + "]");
292: javax.jms.Destination replyTo = pooledSession.destination();
293:
294: //TODO setting up the responseExpected
295:
296: //We don't want to send temp queue in
297: //replyTo header for oneway calls
298: if (isOneWay
299: && (getJMSAddress().getJndiReplyDestinationName() == null)) {
300: replyTo = null;
301: }
302:
303: jmsMessage = base.marshal(request, pooledSession.session(),
304: replyTo, getRuntimePolicy().getMessageType()
305: .value());
306:
307: JMSMessageHeadersType headers = (JMSMessageHeadersType) outMessage
308: .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
309:
310: int deliveryMode = base.getJMSDeliveryMode(headers);
311: int priority = base.getJMSPriority(headers);
312: String correlationID = base.getCorrelationId(headers);
313: long ttl = base.getTimeToLive(headers);
314: if (ttl <= 0) {
315: ttl = getClientConfig().getMessageTimeToLive();
316: }
317:
318: base.setMessageProperties(headers, jmsMessage);
319:
320: if (!isOneWay) {
321: String id = pooledSession.getCorrelationID();
322:
323: if (id != null) {
324: if (correlationID != null) {
325: String error = "User cannot set JMSCorrelationID when "
326: + "making a request/reply invocation using "
327: + "a static replyTo Queue.";
328: throw new JMSException(error);
329: }
330: correlationID = id;
331: }
332: }
333:
334: if (correlationID != null) {
335: jmsMessage.setJMSCorrelationID(correlationID);
336: } else {
337: //No message correlation id is set. Whatever comeback will be accepted as responses.
338: // We assume that it will only happen in case of the temp. reply queue.
339: }
340:
341: getLogger().log(Level.FINE, "client sending request: ",
342: jmsMessage);
343: //getting Destination Style
344: if (base.isDestinationStyleQueue()) {
345: QueueSender sender = (QueueSender) pooledSession
346: .producer();
347: sender.setTimeToLive(ttl);
348: sender.send((Queue) base.targetDestination, jmsMessage,
349: deliveryMode, priority, ttl);
350: } else {
351: TopicPublisher publisher = (TopicPublisher) pooledSession
352: .producer();
353: publisher.setTimeToLive(ttl);
354: publisher.publish((Topic) base.targetDestination,
355: jmsMessage, deliveryMode, priority, ttl);
356: }
357: }
358:
359: private void handleResponse() throws IOException {
360: // REVISIT distinguish decoupled case or oneway call
361: Object response = null;
362:
363: //TODO if outMessage need to get the response
364: Message inMessage = new MessageImpl();
365: outMessage.getExchange().setInMessage(inMessage);
366: //set the message header back to the incomeMessage
367: //inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
368: // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
369:
370: try {
371: response = receive(pooledSession, outMessage);
372: } catch (JMSException jmsex) {
373: getLogger().log(Level.FINE,
374: "JMS connect failed with JMSException : ",
375: jmsex);
376: throw new IOException(jmsex.toString());
377: }
378:
379: //set the message header back to the incomeMessage
380: inMessage
381: .put(
382: JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
383: outMessage
384: .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
385:
386: getLogger().log(Level.FINE,
387: "The Response Message is : [" + response + "]");
388:
389: // setup the inMessage response stream
390: byte[] bytes = null;
391: if (response instanceof String) {
392: String requestString = (String) response;
393: bytes = requestString.getBytes();
394: } else {
395: bytes = (byte[]) response;
396: }
397: inMessage.setContent(InputStream.class,
398: new ByteArrayInputStream(bytes));
399: getLogger().log(Level.FINE,
400: "incoming observer is " + incomingObserver);
401: incomingObserver.onMessage(inMessage);
402: }
403: }
404:
405: /**
406: * Represented decoupled response endpoint.
407: */
408: protected class DecoupledDestination implements Destination {
409: protected MessageObserver decoupledMessageObserver;
410: private EndpointReferenceType address;
411:
412: DecoupledDestination(EndpointReferenceType ref,
413: MessageObserver incomingObserver) {
414: address = ref;
415: decoupledMessageObserver = incomingObserver;
416: }
417:
418: public EndpointReferenceType getAddress() {
419: return address;
420: }
421:
422: public Conduit getBackChannel(Message inMessage,
423: Message partialResponse, EndpointReferenceType addr)
424: throws IOException {
425: // shouldn't be called on decoupled endpoint
426: return null;
427: }
428:
429: public void shutdown() {
430: // TODO Auto-generated method stub
431: }
432:
433: public synchronized void setMessageObserver(
434: MessageObserver observer) {
435: decoupledMessageObserver = observer;
436: }
437:
438: public synchronized MessageObserver getMessageObserver() {
439: return decoupledMessageObserver;
440: }
441: }
442:
443: }
|