001: package org.objectweb.celtix.bus.transports.jms;
002:
003: import java.util.Enumeration;
004: import java.util.List;
005: import java.util.logging.Level;
006: import java.util.logging.Logger;
007:
008: import javax.jms.Destination;
009: import javax.jms.JMSException;
010: import javax.jms.Message;
011: import javax.jms.ObjectMessage;
012: import javax.jms.Session;
013: import javax.jms.TextMessage;
014: import javax.wsdl.Port;
015: import javax.wsdl.WSDLException;
016: import javax.xml.ws.handler.MessageContext;
017:
018: import org.objectweb.celtix.Bus;
019: import org.objectweb.celtix.bus.configuration.wsdl.WsdlJMSConfigurationProvider;
020: import org.objectweb.celtix.common.logging.LogUtils;
021: import org.objectweb.celtix.configuration.Configuration;
022: import org.objectweb.celtix.configuration.ConfigurationBuilder;
023: import org.objectweb.celtix.configuration.ConfigurationBuilderFactory;
024: import org.objectweb.celtix.transports.jms.JMSAddressPolicyType;
025: import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType;
026: import org.objectweb.celtix.transports.jms.context.JMSPropertyType;
027: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
028: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
029:
030: public class JMSTransportBase {
031: //--Member Variables--------------------------------------------------------
032: private static final Logger LOG = LogUtils
033: .getL7dLogger(JMSTransportBase.class);
034: protected JMSAddressPolicyType jmsAddressPolicy;
035: protected boolean queueDestinationStyle;
036: protected Destination targetDestination;
037: protected Destination replyDestination;
038: protected JMSSessionFactory sessionFactory;
039: protected Bus bus;
040: protected EndpointReferenceType targetEndpoint;
041: protected Port port;
042: protected Configuration configuration;
043:
044: //--Constructors------------------------------------------------------------
045: public JMSTransportBase(Bus b, EndpointReferenceType epr,
046: boolean isServer) throws WSDLException {
047: bus = b;
048:
049: port = EndpointReferenceUtils
050: .getPort(bus.getWSDLManager(), epr);
051:
052: configuration = createConfiguration(bus, epr, isServer);
053: jmsAddressPolicy = getAddressPolicy(configuration);
054: targetEndpoint = epr;
055: queueDestinationStyle = JMSConstants.JMS_QUEUE
056: .equals(jmsAddressPolicy.getDestinationStyle().value());
057: }
058:
059: private JMSAddressPolicyType getAddressPolicy(Configuration conf) {
060: JMSAddressPolicyType pol = conf.getObject(
061: JMSAddressPolicyType.class, "jmsAddress");
062: if (pol == null) {
063: pol = new JMSAddressPolicyType();
064: }
065: return pol;
066: }
067:
068: private Configuration createConfiguration(Bus b,
069: EndpointReferenceType ref, boolean isServer) {
070: ConfigurationBuilder cb = ConfigurationBuilderFactory
071: .getBuilder(null);
072:
073: Configuration busConfiguration = b.getConfiguration();
074: Configuration parent = null;
075:
076: String configURI;
077: String configID;
078:
079: if (isServer) {
080: configURI = JMSConstants.JMS_SERVER_CONFIGURATION_URI;
081: configID = JMSConstants.JMS_SERVER_CONFIG_ID;
082: parent = busConfiguration.getChild(
083: JMSConstants.ENDPOINT_CONFIGURATION_URI,
084: EndpointReferenceUtils.getServiceName(ref)
085: .toString());
086: } else {
087: configURI = JMSConstants.JMS_CLIENT_CONFIGURATION_URI;
088: configID = JMSConstants.JMS_CLIENT_CONFIG_ID;
089: String id = EndpointReferenceUtils.getServiceName(ref)
090: .toString()
091: + "/" + EndpointReferenceUtils.getPortName(ref);
092: parent = busConfiguration.getChild(
093: JMSConstants.PORT_CONFIGURATION_URI, id);
094: }
095:
096: assert null != parent;
097:
098: Configuration cfg = cb.getConfiguration(configURI, configID,
099: parent);
100: if (null == cfg) {
101: cfg = cb.buildConfiguration(configURI, configID, parent);
102: }
103: // register the additional provider
104: if (null != port) {
105: cfg.getProviders().add(
106: new WsdlJMSConfigurationProvider(port, false));
107: }
108: return cfg;
109: }
110:
111: //--Methods-----------------------------------------------------------------
112:
113: public final JMSAddressPolicyType getJmsAddressDetails() {
114: return jmsAddressPolicy;
115: }
116:
117: /**
118: * Callback from the JMSProviderHub indicating the ClientTransport has
119: * been sucessfully connected.
120: *
121: * @param targetDestination the target destination
122: * @param sessionFactory used to get access to a pooled JMS resources
123: */
124: protected void connected(Destination target, Destination reply,
125: JMSSessionFactory factory) {
126: targetDestination = target;
127: replyDestination = reply;
128: sessionFactory = factory;
129: }
130:
131: /**
132: * Create a JMS of the appropriate type populated with the given payload.
133: *
134: * @param payload the message payload, expected to be either of type
135: * String or byte[] depending on payload type
136: * @param session the JMS session
137: * @param replyTo the ReplyTo destination if any
138: * @return a JMS of the appropriate type populated with the given payload
139: */
140: protected Message marshal(Object payload, Session session,
141: Destination replyTo, String messageType)
142: throws JMSException {
143: Message message = null;
144:
145: if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) {
146: message = session.createTextMessage((String) payload);
147: } else {
148: message = session.createObjectMessage();
149: ((ObjectMessage) message).setObject((byte[]) payload);
150: }
151:
152: if (replyTo != null) {
153: message.setJMSReplyTo(replyTo);
154: }
155:
156: return message;
157: }
158:
159: /**
160: * Unmarshal the payload of an incoming message.
161: *
162: * @param message the incoming message
163: * @return the unmarshalled message payload, either of type String or
164: * byte[] depending on payload type
165: */
166: protected Object unmarshal(Message message, String messageType)
167: throws JMSException {
168: Object ret = null;
169:
170: if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) {
171: ret = ((TextMessage) message).getText();
172: } else {
173: ret = (byte[]) ((ObjectMessage) message).getObject();
174: }
175:
176: return ret;
177: }
178:
179: protected final void entry(String trace) {
180: LOG.log(Level.FINE, trace);
181: }
182:
183: protected JMSMessageHeadersType populateIncomingContext(
184: Message message, MessageContext context, String headerType)
185: throws JMSException {
186: JMSMessageHeadersType headers = null;
187:
188: headers = (JMSMessageHeadersType) context.get(headerType);
189:
190: if (headers == null) {
191: headers = new JMSMessageHeadersType();
192: context.put(headerType, headers);
193: }
194:
195: headers.setJMSCorrelationID(message.getJMSCorrelationID());
196: headers.setJMSDeliveryMode(new Integer(message
197: .getJMSDeliveryMode()));
198: headers.setJMSExpiration(new Long(message.getJMSExpiration()));
199: headers.setJMSMessageID(message.getJMSMessageID());
200: headers.setJMSPriority(new Integer(message.getJMSPriority()));
201: headers.setJMSRedelivered(Boolean.valueOf(message
202: .getJMSRedelivered()));
203: headers.setJMSTimeStamp(new Long(message.getJMSTimestamp()));
204: headers.setJMSType(message.getJMSType());
205:
206: List<JMSPropertyType> props = headers.getProperty();
207: Enumeration enm = message.getPropertyNames();
208: while (enm.hasMoreElements()) {
209: String name = (String) enm.nextElement();
210: String val = message.getStringProperty(name);
211: JMSPropertyType prop = new JMSPropertyType();
212: prop.setName(name);
213: prop.setValue(val);
214: props.add(prop);
215: }
216:
217: return headers;
218: }
219:
220: protected int getJMSDeliveryMode(JMSMessageHeadersType headers) {
221: int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
222:
223: if (headers != null && headers.isSetJMSDeliveryMode()) {
224: deliveryMode = headers.getJMSDeliveryMode();
225: }
226: return deliveryMode;
227: }
228:
229: protected int getJMSPriority(JMSMessageHeadersType headers) {
230: int priority = Message.DEFAULT_PRIORITY;
231: if (headers != null && headers.isSetJMSPriority()) {
232: priority = headers.getJMSPriority();
233: }
234: return priority;
235: }
236:
237: protected long getTimeToLive(JMSMessageHeadersType headers) {
238: long ttl = -1;
239: if (headers != null && headers.isSetTimeToLive()) {
240: ttl = headers.getTimeToLive();
241: }
242: return ttl;
243: }
244:
245: protected String getCorrelationId(JMSMessageHeadersType headers) {
246: String correlationId = null;
247: if (headers != null && headers.isSetJMSCorrelationID()) {
248: correlationId = headers.getJMSCorrelationID();
249: }
250: return correlationId;
251: }
252:
253: protected void setMessageProperties(JMSMessageHeadersType headers,
254: Message message) throws JMSException {
255:
256: if (headers != null && headers.isSetProperty()) {
257: List<JMSPropertyType> props = headers.getProperty();
258: for (int x = 0; x < props.size(); x++) {
259: message.setStringProperty(props.get(x).getName(), props
260: .get(x).getValue());
261: }
262: }
263: }
264:
265: protected String getAddrUriFromJMSAddrPolicy() {
266: return "jms:" + jmsAddressPolicy.getJndiConnectionFactoryName()
267: + "#" + jmsAddressPolicy.getJndiDestinationName();
268: }
269:
270: protected String getReplyTotAddrUriFromJMSAddrPolicy() {
271: return "jms:" + jmsAddressPolicy.getJndiConnectionFactoryName()
272: + "#" + jmsAddressPolicy.getJndiReplyDestinationName();
273: }
274: }
|