001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.transport.jms;
019:
020: import java.util.Enumeration;
021: import java.util.List;
022:
023: import javax.jms.Destination;
024: import javax.jms.JMSException;
025: import javax.jms.Message;
026: import javax.jms.ObjectMessage;
027: import javax.jms.Session;
028: import javax.jms.TextMessage;
029:
030: import org.apache.cxf.Bus;
031: import org.apache.cxf.service.model.EndpointInfo;
032:
033: public class JMSTransportBase {
034:
035: protected Destination targetDestination;
036: protected Destination replyDestination;
037: protected JMSSessionFactory sessionFactory;
038: protected Bus bus;
039: //protected EndpointReferenceType targetEndpoint;
040: protected EndpointInfo endpointInfo;
041: protected String beanNameSuffix;
042: private JMSTransport transport;
043:
044: //--Constructors------------------------------------------------------------
045: public JMSTransportBase(Bus b, EndpointInfo endpoint,
046: boolean isServer, String suffix, JMSTransport transport) {
047: bus = b;
048: endpointInfo = endpoint;
049: beanNameSuffix = suffix;
050: this .transport = transport;
051: }
052:
053: /**
054: * Callback from the JMSProviderHub indicating the ClientTransport has
055: * been sucessfully connected.
056: *
057: * @param targetDestination the target destination
058: * @param sessionFactory used to get access to a pooled JMS resources
059: */
060: protected void connected(Destination target, Destination reply,
061: JMSSessionFactory factory) {
062: targetDestination = target;
063: replyDestination = reply;
064: sessionFactory = factory;
065: }
066:
067: /**
068: * Create a JMS of the appropriate type populated with the given payload.
069: *
070: * @param payload the message payload, expected to be either of type
071: * String or byte[] depending on payload type
072: * @param session the JMS session
073: * @param replyTo the ReplyTo destination if any
074: * @return a JMS of the appropriate type populated with the given payload
075: */
076: protected Message marshal(Object payload, Session session,
077: Destination replyTo, String messageType)
078: throws JMSException {
079: Message message = null;
080:
081: if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) {
082: message = session.createTextMessage((String) payload);
083: } else {
084: message = session.createObjectMessage();
085: ((ObjectMessage) message).setObject((byte[]) payload);
086: }
087:
088: if (replyTo != null) {
089: message.setJMSReplyTo(replyTo);
090: }
091:
092: return message;
093: }
094:
095: /**
096: * Unmarshal the payload of an incoming message.
097: *
098: * @param message the incoming message
099: * @return the unmarshalled message payload, either of type String or
100: * byte[] depending on payload type
101: */
102: protected Object unmarshal(Message message, String messageType)
103: throws JMSException {
104: Object ret = null;
105:
106: if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) {
107: ret = ((TextMessage) message).getText();
108: } else {
109: ret = (byte[]) ((ObjectMessage) message).getObject();
110: }
111:
112: return ret;
113: }
114:
115: protected JMSMessageHeadersType populateIncomingContext(
116: javax.jms.Message message,
117: org.apache.cxf.message.Message inMessage, String headerType)
118: throws JMSException {
119: JMSMessageHeadersType headers = null;
120:
121: headers = (JMSMessageHeadersType) inMessage.get(headerType);
122:
123: if (headers == null) {
124: headers = new JMSMessageHeadersType();
125: inMessage.put(headerType, headers);
126: }
127:
128: headers.setJMSCorrelationID(message.getJMSCorrelationID());
129: headers.setJMSDeliveryMode(new Integer(message
130: .getJMSDeliveryMode()));
131: headers.setJMSExpiration(new Long(message.getJMSExpiration()));
132: headers.setJMSMessageID(message.getJMSMessageID());
133: headers.setJMSPriority(new Integer(message.getJMSPriority()));
134: headers.setJMSRedelivered(Boolean.valueOf(message
135: .getJMSRedelivered()));
136: headers.setJMSTimeStamp(new Long(message.getJMSTimestamp()));
137: headers.setJMSType(message.getJMSType());
138:
139: List<JMSPropertyType> props = headers.getProperty();
140: Enumeration enm = message.getPropertyNames();
141: while (enm.hasMoreElements()) {
142: String name = (String) enm.nextElement();
143: String val = message.getStringProperty(name);
144: JMSPropertyType prop = new JMSPropertyType();
145: prop.setName(name);
146: prop.setValue(val);
147: props.add(prop);
148: }
149:
150: return headers;
151: }
152:
153: protected int getJMSDeliveryMode(JMSMessageHeadersType headers) {
154: int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
155:
156: if (headers != null && headers.isSetJMSDeliveryMode()) {
157: deliveryMode = headers.getJMSDeliveryMode();
158: }
159: return deliveryMode;
160: }
161:
162: protected int getJMSPriority(JMSMessageHeadersType headers) {
163: int priority = Message.DEFAULT_PRIORITY;
164: if (headers != null && headers.isSetJMSPriority()) {
165: priority = headers.getJMSPriority();
166: }
167: return priority;
168: }
169:
170: protected long getTimeToLive(JMSMessageHeadersType headers) {
171: long ttl = -1;
172: if (headers != null && headers.isSetTimeToLive()) {
173: ttl = headers.getTimeToLive();
174: }
175: return ttl;
176: }
177:
178: protected String getCorrelationId(JMSMessageHeadersType headers) {
179: String correlationId = null;
180: if (headers != null && headers.isSetJMSCorrelationID()) {
181: correlationId = headers.getJMSCorrelationID();
182: }
183: return correlationId;
184: }
185:
186: protected void setMessageProperties(JMSMessageHeadersType headers,
187: Message message) throws JMSException {
188:
189: if (headers != null && headers.isSetProperty()) {
190: List<JMSPropertyType> props = headers.getProperty();
191: for (int x = 0; x < props.size(); x++) {
192: message.setStringProperty(props.get(x).getName(), props
193: .get(x).getValue());
194: }
195: }
196: }
197:
198: protected String getAddrUriFromJMSAddrPolicy() {
199: AddressType jmsAddressPolicy = transport.getJMSAddress();
200: return "jms:" + jmsAddressPolicy.getJndiConnectionFactoryName()
201: + "#" + jmsAddressPolicy.getJndiDestinationName();
202: }
203:
204: protected String getReplyTotAddrUriFromJMSAddrPolicy() {
205: AddressType jmsAddressPolicy = transport.getJMSAddress();
206: return "jms:" + jmsAddressPolicy.getJndiConnectionFactoryName()
207: + "#" + jmsAddressPolicy.getJndiReplyDestinationName();
208: }
209:
210: protected boolean isDestinationStyleQueue() {
211: return JMSConstants.JMS_QUEUE.equals(transport.getJMSAddress()
212: .getDestinationStyle().value());
213: }
214: }
|