001: package org.objectweb.celtix.bus.transports.jms;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005: import java.net.URL;
006: import java.util.concurrent.Executor;
007:
008: import javax.jms.DeliveryMode;
009: import javax.wsdl.Port;
010: import javax.wsdl.WSDLException;
011: import javax.xml.namespace.QName;
012: import javax.xml.ws.handler.MessageContext;
013:
014: import junit.framework.Test;
015: import junit.framework.TestCase;
016: import junit.framework.TestSuite;
017:
018: import org.objectweb.celtix.Bus;
019: import org.objectweb.celtix.BusException;
020: import org.objectweb.celtix.bindings.ClientBinding;
021: import org.objectweb.celtix.bus.workqueue.WorkQueueManagerImpl;
022:
023: import org.objectweb.celtix.configuration.Configuration;
024: import org.objectweb.celtix.configuration.ConfigurationBuilder;
025: import org.objectweb.celtix.configuration.ConfigurationBuilderFactory;
026:
027: import org.objectweb.celtix.context.GenericMessageContext;
028: import org.objectweb.celtix.context.InputStreamMessageContext;
029: import org.objectweb.celtix.context.OutputStreamMessageContext;
030: import org.objectweb.celtix.transports.ServerTransport;
031: import org.objectweb.celtix.transports.ServerTransportCallback;
032:
033: import org.objectweb.celtix.transports.jms.JMSAddressPolicyType;
034: import org.objectweb.celtix.transports.jms.JMSClientBehaviorPolicyType;
035: import org.objectweb.celtix.transports.jms.JMSServerBehaviorPolicyType;
036: import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType;
037:
038: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
039: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
040: import org.objectweb.celtix.wsdl.JAXBExtensionHelper;
041:
042: public class JMSContextTest extends TestCase {
043:
044: public static final String TEST_CORRELATION_ID = "TestCorrelationId";
045: private ServerTransportCallback callback;
046: private Bus bus;
047: private WorkQueueManagerImpl wqm;
048:
049: public JMSContextTest(String arg0) {
050: super (arg0);
051: }
052:
053: public static void main(String[] args) {
054: junit.textui.TestRunner.run(JMSContextTest.suite());
055: }
056:
057: public static Test suite() {
058: TestSuite suite = new TestSuite(JMSContextTest.class);
059: return new JMSBrokerSetup(suite, "tcp://localhost:61500");
060: }
061:
062: public void setUp() throws Exception {
063: bus = Bus.init();
064: JAXBExtensionHelper.addExtensions(bus.getWSDLManager()
065: .getExtenstionRegistry(), Port.class,
066: JMSAddressPolicyType.class);
067: JAXBExtensionHelper.addExtensions(bus.getWSDLManager()
068: .getExtenstionRegistry(), Port.class,
069: JMSServerBehaviorPolicyType.class);
070: JAXBExtensionHelper.addExtensions(bus.getWSDLManager()
071: .getExtenstionRegistry(), Port.class,
072: JMSClientBehaviorPolicyType.class);
073: }
074:
075: public void tearDown() throws Exception {
076: if (wqm != null) {
077: wqm.shutdown(true);
078: }
079: if (bus != null) {
080: bus.shutdown(true);
081: }
082: }
083:
084: public void testTwoWayTextQueueJMSTransport() throws Exception {
085: QName serviceName = new QName(
086: "http://celtix.objectweb.org/hello_world_jms",
087: "HelloWorldService");
088: doTestJMSTransport(false, serviceName, "HelloWorldPort",
089: "/wsdl/jms_test.wsdl");
090: }
091:
092: private int readBytes(byte bytes[], InputStream ins)
093: throws IOException {
094: int len = ins.read(bytes);
095: int total = 0;
096: while (len != -1) {
097: total += len;
098: len = ins.read(bytes, total, bytes.length - total);
099: }
100: return total;
101: }
102:
103: public class TestServerTransportCallback implements
104: ServerTransportCallback {
105: boolean useAutomaticWorkQueue;
106:
107: public TestServerTransportCallback(boolean useAutoWQ) {
108: useAutomaticWorkQueue = useAutoWQ;
109: }
110:
111: public void dispatch(InputStreamMessageContext ctx,
112: ServerTransport transport) {
113:
114: try {
115: byte bytes[] = new byte[10000];
116:
117: int total = readBytes(bytes, ctx.getInputStream());
118:
119: JMSOutputStreamContext octx = (JMSOutputStreamContext) transport
120: .createOutputStreamContext(ctx);
121: octx.setOneWay(false);
122: transport.finalPrepareOutputStreamContext(octx);
123: octx.getOutputStream().write(bytes, 0, total);
124: octx.getOutputStream().flush();
125:
126: MessageContext replyCtx = new GenericMessageContext();
127: ctx.put("ObjectMessageContext.MESSAGE_INPUT",
128: Boolean.TRUE);
129: replyCtx.putAll(ctx);
130: replyCtx.put("ObjectMessageContext.MESSAGE_INPUT",
131: Boolean.TRUE);
132:
133: ((JMSServerTransport) transport).postDispatch(replyCtx,
134: octx);
135: octx.getOutputStream().close();
136: } catch (Exception ex) {
137: //
138: }
139: }
140:
141: public Executor getExecutor() {
142: if (useAutomaticWorkQueue) {
143: if (wqm == null) {
144: wqm = new WorkQueueManagerImpl(bus);
145: }
146: return wqm.getAutomaticWorkQueue();
147: } else {
148: return null;
149: }
150: }
151: }
152:
153: public void setupCallbackObject(final boolean useAutomaticWorkQueue) {
154: callback = new TestServerTransportCallback(
155: useAutomaticWorkQueue);
156: }
157:
158: public void doTestJMSTransport(final boolean useAutomaticWorkQueue,
159: QName serviceName, String portName, String testWsdlFileName)
160: throws Exception {
161:
162: String address = "http://localhost:9000/SoapContext/SoapPort";
163: URL wsdlUrl = getClass().getResource(testWsdlFileName);
164: assertNotNull(wsdlUrl);
165:
166: createConfiguration(wsdlUrl, serviceName, portName);
167:
168: ServerTransport server = createServerTransport(wsdlUrl,
169: serviceName, portName, address);
170: setupCallbackObject(useAutomaticWorkQueue);
171:
172: server.activate(callback);
173:
174: TestJMSClientTransport client = createClientTransport(wsdlUrl,
175: serviceName, portName, address);
176: OutputStreamMessageContext octx = client
177: .createOutputStreamContext(new GenericMessageContext());
178:
179: setRequestContextHeader(octx);
180:
181: client.finalPrepareOutputStreamContext(octx);
182:
183: byte outBytes[] = "Hello World!!!".getBytes();
184: octx.getOutputStream().write(outBytes);
185:
186: // make sure that the inner context that we used to create has the same values that we inserted
187: checkContextHeader(client.getContext(),
188: JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
189: InputStreamMessageContext ictx = client.invoke(octx);
190: byte bytes[] = new byte[10000];
191: int len = ictx.getInputStream().read(bytes);
192: assertTrue("Did not read anything " + len, len > 0);
193: assertEquals(new String(outBytes), new String(bytes, 0, len));
194:
195: checkResponseContextHeader(ictx);
196:
197: server.shutdown();
198: client.shutdown();
199: }
200:
201: public void checkResponseContextHeader(MessageContext ctx) {
202: assertTrue("JMSContext should contain the property "
203: + JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, null != ctx
204: .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
205:
206: JMSMessageHeadersType responseHeader = (JMSMessageHeadersType) ctx
207: .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
208: assertTrue("JMSHeader correlation id mismatch: expected "
209: + TEST_CORRELATION_ID, TEST_CORRELATION_ID
210: .equals(responseHeader.getJMSCorrelationID()));
211: assertTrue("JMSRedelivered should be false", !responseHeader
212: .isJMSRedelivered());
213: }
214:
215: public void setRequestContextHeader(OutputStreamMessageContext octx) {
216: JMSMessageHeadersType requestHeader = new JMSMessageHeadersType();
217: requestHeader.setJMSCorrelationID(TEST_CORRELATION_ID);
218: requestHeader.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
219: requestHeader.setJMSExpiration(3600000L);
220: requestHeader.setJMSPriority(6);
221: requestHeader.setTimeToLive(3600000L);
222:
223: octx
224: .put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS,
225: requestHeader);
226: }
227:
228: public void checkContextHeader(MessageContext ctx, String headerName) {
229:
230: assertTrue("JMSContext should contain the property "
231: + headerName, null != ctx.get(headerName));
232:
233: JMSMessageHeadersType reqHdr = (JMSMessageHeadersType) ctx
234: .get(headerName);
235: assertTrue("JMSHeader correlation id mismatch: expected "
236: + TEST_CORRELATION_ID, TEST_CORRELATION_ID
237: .equals(reqHdr.getJMSCorrelationID()));
238: assertTrue("JMSRedelivered should be false", !reqHdr
239: .isSetJMSRedelivered());
240: assertTrue("JMS priority should be 6",
241: reqHdr.getJMSPriority() == 6);
242: assertTrue("JMS timetolive should be greater than 0 ", reqHdr
243: .getTimeToLive() > 0);
244: }
245:
246: private TestJMSClientTransport createClientTransport(URL wsdlUrl,
247: QName serviceName, String portName, String address)
248: throws WSDLException, IOException {
249: EndpointReferenceType ref = EndpointReferenceUtils
250: .getEndpointReference(wsdlUrl, serviceName, portName);
251: return new TestJMSClientTransport(bus, ref, null);
252: }
253:
254: private ServerTransport createServerTransport(URL wsdlUrl,
255: QName serviceName, String portName, String address)
256: throws WSDLException, IOException {
257:
258: EndpointReferenceType ref = EndpointReferenceUtils
259: .getEndpointReference(wsdlUrl, serviceName, portName);
260: EndpointReferenceUtils.setAddress(ref, address);
261: return new JMSServerTransport(bus, ref);
262: }
263:
264: // Create bus and populate the configuration for Endpoint, Service and port.
265: // This test uses all the info. either coming from WSDL or default and do not use
266: // any configuration files.
267: //
268:
269: private void createConfiguration(URL wsdlUrl, QName serviceName,
270: String portName) throws WSDLException, IOException,
271: BusException {
272: assert bus != null;
273: EndpointReferenceType ref = EndpointReferenceUtils
274: .getEndpointReference(wsdlUrl, serviceName, portName);
275: Configuration busCfg = bus.getConfiguration();
276: assert null != busCfg;
277:
278: String id = EndpointReferenceUtils.getServiceName(ref)
279: .toString();
280: ConfigurationBuilder cb = ConfigurationBuilderFactory
281: .getBuilder(null);
282: cb.buildConfiguration(JMSConstants.ENDPOINT_CONFIGURATION_URI,
283: id, busCfg);
284: cb.buildConfiguration(JMSConstants.PORT_CONFIGURATION_URI, id
285: + "/"
286: + EndpointReferenceUtils.getPortName(ref).toString(),
287: busCfg);
288: }
289:
290: public class TestJMSClientTransport extends JMSClientTransport {
291:
292: private MessageContext localContext;
293:
294: public TestJMSClientTransport(Bus theBus,
295: EndpointReferenceType address, ClientBinding binding)
296: throws WSDLException, IOException {
297: super (theBus, address, binding);
298: }
299:
300: @Override
301: public OutputStreamMessageContext createOutputStreamContext(
302: MessageContext context) throws IOException {
303: localContext = context;
304: return new JMSOutputStreamContext(context);
305: }
306:
307: public MessageContext getContext() {
308: return localContext;
309: }
310:
311: }
312:
313: }
|