001: package org.objectweb.celtix.bus.transports.jms;
002:
003: import java.io.ByteArrayInputStream;
004: import java.io.ByteArrayOutputStream;
005: import java.io.IOException;
006: import java.util.concurrent.Executor;
007: import java.util.concurrent.Future;
008: import java.util.logging.Level;
009: import java.util.logging.Logger;
010:
011: import javax.jms.Destination;
012: import javax.jms.JMSException;
013: import javax.jms.Message;
014: import javax.jms.Queue;
015: import javax.jms.QueueSender;
016: import javax.jms.TextMessage;
017: import javax.jms.Topic;
018: import javax.jms.TopicPublisher;
019: import javax.naming.NamingException;
020: import javax.wsdl.Port;
021: import javax.wsdl.WSDLException;
022: import javax.xml.ws.handler.MessageContext;
023:
024: import org.objectweb.celtix.Bus;
025: import org.objectweb.celtix.bindings.ClientBinding;
026: import org.objectweb.celtix.bindings.ResponseCallback;
027: import org.objectweb.celtix.bus.management.counters.TransportClientCounters;
028: import org.objectweb.celtix.common.logging.LogUtils;
029: import org.objectweb.celtix.configuration.Configuration;
030:
031: import org.objectweb.celtix.context.InputStreamMessageContext;
032: import org.objectweb.celtix.context.OutputStreamMessageContext;
033: import org.objectweb.celtix.transports.ClientTransport;
034: import org.objectweb.celtix.transports.jms.JMSClientBehaviorPolicyType;
035: import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType;
036: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
037: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
038:
039: public class JMSClientTransport extends JMSTransportBase implements
040: ClientTransport {
041:
042: private static final Logger LOG = LogUtils
043: .getL7dLogger(JMSClientTransport.class);
044: private static final long DEFAULT_RECEIVE_TIMEOUT = 0;
045:
046: protected boolean textPayload;
047: TransportClientCounters counters;
048: private JMSClientBehaviorPolicyType clientBehaviourPolicy;
049: private ResponseCallback responseCallback;
050:
051: public JMSClientTransport(Bus bus, EndpointReferenceType address,
052: ClientBinding binding) throws WSDLException, IOException {
053:
054: super (bus, address, false);
055: clientBehaviourPolicy = getClientPolicy(configuration);
056: counters = new TransportClientCounters("JMSClientTransport");
057:
058: EndpointReferenceUtils.setAddress(address,
059: getAddrUriFromJMSAddrPolicy());
060: targetEndpoint = address;
061:
062: textPayload = JMSConstants.TEXT_MESSAGE_TYPE
063: .equals(clientBehaviourPolicy.getMessageType().value());
064:
065: LOG.log(Level.FINE, "TEXT_MESSAGE_TYPE: ", textPayload);
066: LOG.log(Level.FINE, "QUEUE_DESTINATION_STYLE: ",
067: queueDestinationStyle);
068: if (binding != null) {
069: responseCallback = binding.createResponseCallback();
070: }
071: entry("JMSClientTransport Constructor");
072: }
073:
074: private JMSClientBehaviorPolicyType getClientPolicy(
075: Configuration conf) {
076: JMSClientBehaviorPolicyType pol = conf.getObject(
077: JMSClientBehaviorPolicyType.class, "jmsClient");
078: if (pol == null) {
079: pol = new JMSClientBehaviorPolicyType();
080: }
081: return pol;
082: }
083:
084: public JMSClientBehaviorPolicyType getJMSClientBehaviourPolicy() {
085:
086: return clientBehaviourPolicy;
087: }
088:
089: //TODO: Revisit for proper implementation and changes if any.
090:
091: public void shutdown() {
092: entry("JMSClientTransport shutdown()");
093:
094: // ensure resources held by session factory are released
095: //
096: if (sessionFactory != null) {
097: sessionFactory.shutdown();
098: }
099: }
100:
101: public EndpointReferenceType getTargetEndpoint() {
102: return targetEndpoint;
103: }
104:
105: public EndpointReferenceType getDecoupledEndpoint()
106: throws IOException {
107:
108: if (jmsAddressPolicy.getJndiReplyDestinationName() != null) {
109: EndpointReferenceType epr = new EndpointReferenceType();
110: EndpointReferenceUtils.setAddress(epr,
111: getReplyTotAddrUriFromJMSAddrPolicy());
112: return epr;
113: }
114:
115: return null;
116: }
117:
118: public Port getPort() {
119: return port;
120: }
121:
122: public OutputStreamMessageContext createOutputStreamContext(
123: MessageContext context) throws IOException {
124: return new JMSOutputStreamContext(context);
125: }
126:
127: public void finalPrepareOutputStreamContext(
128: OutputStreamMessageContext context) throws IOException {
129: }
130:
131: public InputStreamMessageContext invoke(
132: OutputStreamMessageContext context) throws IOException {
133:
134: if (!queueDestinationStyle) {
135: LOG
136: .log(Level.WARNING,
137: "Non-oneway invocations not supported for JMS Topics");
138: throw new IOException(
139: "Non-oneway invocations not supported for JMS Topics");
140: }
141:
142: try {
143: byte[] responseData = null;
144: if (textPayload) {
145: String responseString = (String) invoke(context, true);
146: responseData = responseString.getBytes();
147: } else {
148: responseData = (byte[]) invoke(context, true);
149: }
150: counters.getInvoke().increase();
151: JMSInputStreamContext respContext = new JMSInputStreamContext(
152: new ByteArrayInputStream(responseData));
153:
154: if (context
155: .containsKey(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS)) {
156: JMSMessageHeadersType responseHdr = (JMSMessageHeadersType) context
157: .remove(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
158: respContext.put(
159: JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
160: responseHdr);
161: respContext.setScope(
162: JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
163: MessageContext.Scope.APPLICATION);
164: }
165:
166: return respContext;
167: } catch (Exception ex) {
168: //TODO: decide what to do with the exception.
169: counters.getInvokeError().increase();
170: throw new IOException(ex.getMessage());
171: }
172: }
173:
174: /**
175: * Variant on invoke used for oneways.
176: *
177: * @param request the buffer to send
178: */
179: public void invokeOneway(OutputStreamMessageContext context)
180: throws IOException {
181: try {
182: invoke(context, false);
183: counters.getInvokeOneWay();
184: } catch (Exception ex) {
185: counters.getInvokeError().increase();
186: throw new IOException(ex.getMessage());
187: }
188: }
189:
190: public Future<InputStreamMessageContext> invokeAsync(
191: OutputStreamMessageContext context, Executor executor)
192: throws IOException {
193: return null;
194: }
195:
196: public ResponseCallback getResponseCallback() {
197: return responseCallback;
198: }
199:
200: /**
201: * Internal invoke mechanics.
202: *
203: * @param request the buffer to send
204: * @param responseExpected true iff a response is expected
205: * @return the response buffer if expected
206: */
207: private Object invoke(OutputStreamMessageContext context,
208: boolean responseExpected) throws JMSException,
209: NamingException {
210: entry("JMSClientTransport invoke()");
211:
212: try {
213: if (null == sessionFactory) {
214: JMSProviderHub.connect(this );
215: }
216: } catch (JMSException ex) {
217: LOG.log(Level.FINE,
218: "JMS connect failed with JMSException : ", ex);
219: throw ex;
220: } catch (NamingException e) {
221: LOG.log(Level.FINE,
222: "JMS connect failed with NamingException : ", e);
223: throw e;
224: }
225:
226: if (sessionFactory == null) {
227: throw new java.lang.IllegalStateException(
228: "JMSClientTransport not connected");
229: }
230:
231: PooledSession pooledSession = sessionFactory
232: .get(responseExpected);
233: send(pooledSession, context, responseExpected);
234:
235: Object response = null;
236:
237: if (responseExpected) {
238: response = receive(pooledSession, context);
239: }
240:
241: sessionFactory.recycle(pooledSession);
242:
243: return response;
244: }
245:
246: /**
247: * Send mechanics.
248: *
249: * @param request the request buffer
250: * @param pooledSession the shared JMS resources
251: */
252: private void send(PooledSession pooledSession,
253: OutputStreamMessageContext context, boolean responseExpected)
254: throws JMSException {
255: Object request;
256:
257: if (textPayload) {
258: request = context.getOutputStream().toString();
259: } else {
260: request = ((ByteArrayOutputStream) context
261: .getOutputStream()).toByteArray();
262: }
263:
264: Destination replyTo = pooledSession.destination();
265:
266: //We don't want to send temp queue in
267: //replyTo header for oneway calls
268: if (!responseExpected
269: && (jmsAddressPolicy.getJndiReplyDestinationName() == null)) {
270: replyTo = null;
271: }
272:
273: Message message = marshal(request, pooledSession.session(),
274: replyTo, clientBehaviourPolicy.getMessageType().value());
275: // message.get
276:
277: JMSMessageHeadersType headers = (JMSMessageHeadersType) context
278: .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
279:
280: int deliveryMode = getJMSDeliveryMode(headers);
281: int priority = getJMSPriority(headers);
282: String correlationID = getCorrelationId(headers);
283: long ttl = getTimeToLive(headers);
284: if (ttl <= 0) {
285: ttl = DEFAULT_RECEIVE_TIMEOUT;
286: }
287:
288: setMessageProperties(headers, message);
289: if (responseExpected) {
290: String id = pooledSession.getCorrelationID();
291:
292: if (id != null) {
293: if (correlationID != null) {
294: String error = "User cannot set JMSCorrelationID when "
295: + "making a request/reply invocation using "
296: + "a static replyTo Queue.";
297: throw new JMSException(error);
298: }
299: correlationID = id;
300: }
301: }
302:
303: if (correlationID != null) {
304: message.setJMSCorrelationID(correlationID);
305: } else {
306: //No message correlation id is set. Whatever comeback will be accepted as responses.
307: // We assume that it will only happen in case of the temp. reply queue.
308: }
309:
310: LOG.log(Level.FINE, "client sending request: ", message);
311:
312: if (queueDestinationStyle) {
313: QueueSender sender = (QueueSender) pooledSession.producer();
314: sender.setTimeToLive(ttl);
315: sender.send((Queue) targetDestination, message,
316: deliveryMode, priority, ttl);
317: } else {
318: TopicPublisher publisher = (TopicPublisher) pooledSession
319: .producer();
320: publisher.setTimeToLive(ttl);
321: publisher.publish((Topic) targetDestination, message,
322: deliveryMode, priority, ttl);
323: }
324: }
325:
326: /**
327: * Receive mechanics.
328: *
329: * @param pooledSession the shared JMS resources
330: * @retrun the response buffer
331: */
332: private Object receive(PooledSession pooledSession,
333: OutputStreamMessageContext context) throws JMSException {
334: Object response = null;
335:
336: long timeout = DEFAULT_RECEIVE_TIMEOUT;
337:
338: Long receiveTimeout = (Long) context
339: .get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
340:
341: if (receiveTimeout != null) {
342: timeout = receiveTimeout.longValue();
343: }
344:
345: Message message = pooledSession.consumer().receive(timeout);
346: LOG.log(Level.FINE, "client received reply: ", message);
347:
348: if (message != null) {
349:
350: populateIncomingContext(message, context,
351: JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
352: String messageType = message instanceof TextMessage ? JMSConstants.TEXT_MESSAGE_TYPE
353: : JMSConstants.BINARY_MESSAGE_TYPE;
354: response = unmarshal(message, messageType);
355: return response;
356: } else {
357: String error = "JMSClientTransport.receive() timed out. No message available.";
358: LOG.log(Level.SEVERE, error);
359: //TODO: Review what exception should we throw.
360: //throw new JMSException(error);
361: return null;
362: }
363: }
364: }
|