001: package org.objectweb.celtix.bus.transports.jms;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005: import java.net.URL;
006: import java.util.List;
007: import java.util.concurrent.Executor;
008:
009: import javax.wsdl.WSDLException;
010: import javax.xml.namespace.QName;
011: import javax.xml.ws.handler.MessageContext;
012:
013: import junit.framework.Test;
014: import junit.framework.TestCase;
015: import junit.framework.TestSuite;
016:
017: import org.objectweb.celtix.Bus;
018: import org.objectweb.celtix.BusException;
019: import org.objectweb.celtix.bus.bindings.TestClientBinding;
020: import org.objectweb.celtix.bus.transports.TransportFactoryManagerImpl;
021: import org.objectweb.celtix.bus.workqueue.WorkQueueManagerImpl;
022:
023: import org.objectweb.celtix.configuration.Configuration;
024: import org.objectweb.celtix.configuration.ConfigurationBuilder;
025: import org.objectweb.celtix.configuration.ConfigurationBuilderFactory;
026: import org.objectweb.celtix.configuration.types.ClassNamespaceMappingListType;
027: import org.objectweb.celtix.configuration.types.ClassNamespaceMappingType;
028: import org.objectweb.celtix.configuration.types.ObjectFactory;
029: import org.objectweb.celtix.context.GenericMessageContext;
030: import org.objectweb.celtix.context.InputStreamMessageContext;
031: import org.objectweb.celtix.context.OutputStreamMessageContext;
032: import org.objectweb.celtix.transports.ClientTransport;
033: import org.objectweb.celtix.transports.ServerTransport;
034: import org.objectweb.celtix.transports.ServerTransportCallback;
035: import org.objectweb.celtix.transports.TransportFactory;
036: import org.objectweb.celtix.transports.TransportFactoryManager;
037: import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType;
038: import org.objectweb.celtix.transports.jms.context.JMSPropertyType;
039: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
040: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
041:
042: public class JMSTransportTest extends TestCase {
043:
044: public static final String JMSTRANSPORT_SKIP_RESPONSE = "JMSTransport.skipResponse";
045: private ServerTransportCallback callback;
046: private ServerTransportCallback callback1;
047: private Bus bus;
048: private String serverRcvdInOneWayCall;
049: private WorkQueueManagerImpl wqm;
050:
051: public JMSTransportTest(String arg0) {
052: super (arg0);
053: }
054:
055: public static void main(String[] args) {
056: junit.textui.TestRunner.run(JMSTransportTest.suite());
057: }
058:
059: public static Test suite() {
060: TestSuite suite = new TestSuite(JMSTransportTest.class);
061: return new JMSBrokerSetup(suite, "tcp://localhost:61500");
062: }
063:
064: public void setUp() throws Exception {
065: bus = Bus.init();
066: }
067:
068: public void tearDown() throws Exception {
069: if (wqm != null) {
070: wqm.shutdown(true);
071: }
072: if (bus != null) {
073: bus.shutdown(true);
074: }
075: }
076:
077: public void testOneWayTextQueueJMSTransport() throws Exception {
078: QName serviceName = new QName(
079: "http://celtix.objectweb.org/hello_world_jms",
080: "HelloWorldOneWayQueueService");
081: doOneWayTestJMSTranport(false, serviceName,
082: "HelloWorldOneWayQueuePort",
083: "dynamicQueues/test.jmstransport.oneway");
084: }
085:
086: public void testPubSubJMSTransport() throws Exception {
087: QName serviceName = new QName(
088: "http://celtix.objectweb.org/hello_world_jms",
089: "HelloWorldPubSubService");
090: doOneWayTestJMSTranport(false, serviceName,
091: "HelloWorldPubSubPort",
092: "dynamicTopics/test.jmstransport.oneway.topic");
093: }
094:
095: public void testTwoWayTextQueueJMSTransport() throws Exception {
096: QName serviceName = new QName(
097: "http://celtix.objectweb.org/hello_world_jms",
098: "HelloWorldService");
099: doTestJMSTransport(false, serviceName, "HelloWorldPort",
100: "dynamicQueues/test.jmstransport.text");
101: }
102:
103: public void testTwoWayBinaryQueueJMSTransport() throws Exception {
104: QName serviceName = new QName(
105: "http://celtix.objectweb.org/hello_world_jms",
106: "HelloWorldQueueBinMsgService");
107: doTestJMSTransport(false, serviceName,
108: "HelloWorldQueueBinMsgPort",
109: "dynamicQueues/test.jmstransport.binary");
110: }
111:
112: public void test2WayStaticReplyQTextMessageJMSTransport()
113: throws Exception {
114: QName serviceName = new QName(
115: "http://celtix.objectweb.org/hello_world_jms",
116: "HWStaticReplyQTextMsgService");
117: doTestJMSTransport(false, serviceName,
118: "HWStaticReplyQTextPort",
119: "dynamicQueues/test.jmstransport.text");
120: }
121:
122: private int readBytes(byte bytes[], InputStream ins)
123: throws IOException {
124: int len = ins.read(bytes);
125: int total = 0;
126: while (len != -1) {
127: total += len;
128: len = ins.read(bytes, total, bytes.length - total);
129: }
130: return total;
131: }
132:
133: public class TestServerTransportCallback implements
134: ServerTransportCallback {
135: boolean useAutomaticWorkQueue;
136:
137: public TestServerTransportCallback(boolean useAutoWQ) {
138: useAutomaticWorkQueue = useAutoWQ;
139: }
140:
141: public void dispatch(InputStreamMessageContext ctx,
142: ServerTransport transport) {
143:
144: try {
145: byte bytes[] = new byte[10000];
146: if (ctx.containsKey(JMSConstants.JMS_SERVER_HEADERS)) {
147: JMSMessageHeadersType msgHdr = (JMSMessageHeadersType) ctx
148: .get(JMSConstants.JMS_SERVER_HEADERS);
149: if (msgHdr.getProperty().contains(
150: JMSTRANSPORT_SKIP_RESPONSE)) {
151: //no need to process the response.
152: return;
153: }
154: }
155:
156: int total = readBytes(bytes, ctx.getInputStream());
157:
158: JMSOutputStreamContext octx = (JMSOutputStreamContext) transport
159: .createOutputStreamContext(ctx);
160: octx.setOneWay(false);
161: transport.finalPrepareOutputStreamContext(octx);
162: octx.getOutputStream().write(bytes, 0, total);
163: octx.getOutputStream().flush();
164:
165: MessageContext replyCtx = new GenericMessageContext();
166: ctx.put("ObjectMessageContext.MESSAGE_INPUT",
167: Boolean.TRUE);
168: replyCtx.putAll(ctx);
169: replyCtx.put("ObjectMessageContext.MESSAGE_INPUT",
170: Boolean.TRUE);
171:
172: ((JMSServerTransport) transport).postDispatch(replyCtx,
173: octx);
174: octx.getOutputStream().close();
175: } catch (Exception ex) {
176: //
177: }
178: }
179:
180: public Executor getExecutor() {
181: if (useAutomaticWorkQueue) {
182: if (wqm == null) {
183: wqm = new WorkQueueManagerImpl(bus);
184: }
185: return wqm.getAutomaticWorkQueue();
186: } else {
187: return null;
188: }
189: }
190: }
191:
192: public void setupCallbackObject(final boolean useAutomaticWorkQueue) {
193: callback = new TestServerTransportCallback(
194: useAutomaticWorkQueue);
195: }
196:
197: public void doTestJMSTransport(final boolean useAutomaticWorkQueue,
198: QName serviceName, String portName,
199: String jndiDestinationName) throws Exception {
200:
201: String address = "jms:ConnectionFactory#" + jndiDestinationName;
202: URL wsdlUrl = getClass().getResource("/wsdl/jms_test.wsdl");
203: assertNotNull(wsdlUrl);
204:
205: createConfiguration(wsdlUrl, serviceName, portName);
206: TransportFactory factory = createTransportFactory();
207:
208: ServerTransport server = createServerTransport(factory,
209: wsdlUrl, serviceName, portName, address);
210: setupCallbackObject(useAutomaticWorkQueue);
211:
212: server.activate(callback);
213:
214: ClientTransport client = createClientTransport(factory,
215: wsdlUrl, serviceName, portName);
216: assertTrue("targetEndpoint address mismatch. Expected : "
217: + address + " received : "
218: + client.getTargetEndpoint(), address.equals(client
219: .getTargetEndpoint().getAddress().getValue()));
220:
221: OutputStreamMessageContext octx = null;
222: byte outBytes[] = "Hello World!!!".getBytes();
223: InputStreamMessageContext ictx = doClientInvoke(client, octx,
224: outBytes, false);
225:
226: byte bytes[] = new byte[10000];
227: int len = ictx.getInputStream().read(bytes);
228: assertTrue("Did not read anything " + len, len > 0);
229: assertEquals(new String(outBytes), new String(bytes, 0, len));
230:
231: //long request
232: outBytes = new byte[5000];
233: for (int x = 0; x < outBytes.length; x++) {
234: outBytes[x] = (byte) ('a' + (x % 26));
235: }
236:
237: ictx = doClientInvoke(client, octx, outBytes, false);
238: int total = readBytes(bytes, ictx.getInputStream());
239:
240: assertTrue("Did not read anything " + total, total > 0);
241: assertEquals(new String(outBytes), new String(bytes, 0, total));
242:
243: outBytes = "Hello World!!!".getBytes();
244:
245: server.deactivate();
246:
247: try {
248: ictx = doClientInvoke(client, octx, outBytes, true);
249: len = ictx.getInputStream().read(bytes);
250:
251: if (len != -1) {
252: fail("was able to process a message after the servant was deactivated: "
253: + len + " - " + new String(bytes));
254: }
255: } catch (IOException ex) {
256: //ignore - this is what we want
257: }
258:
259: server.activate(callback);
260:
261: outBytes = "New String and must match with response".getBytes();
262: ictx = doClientInvoke(client, octx, outBytes, false);
263: len = ictx.getInputStream().read(bytes);
264: assertTrue("Did not read anything " + len, len > 0);
265: assertEquals(new String(outBytes), new String(bytes, 0, len));
266: server.shutdown();
267: client.shutdown();
268: }
269:
270: public InputStreamMessageContext doClientInvoke(
271: ClientTransport client, OutputStreamMessageContext octx,
272: byte[] outBytes, boolean insertContextInfo)
273: throws Exception {
274: octx = client
275: .createOutputStreamContext(new GenericMessageContext());
276: client.finalPrepareOutputStreamContext(octx);
277: octx.getOutputStream().write(outBytes);
278: if (insertContextInfo) {
279: insertContextInfo(octx);
280: }
281: return client.invoke(octx);
282: }
283:
284: public void insertContextInfo(OutputStreamMessageContext octx) {
285: //Set time to live and default receive timeout so as to timeout the client
286: JMSMessageHeadersType requestHeader = new JMSMessageHeadersType();
287: requestHeader.setTimeToLive(100L);
288: List<JMSPropertyType> props = requestHeader.getProperty();
289: JMSPropertyType skipResponseProperty = new JMSPropertyType();
290: skipResponseProperty.setName(JMSTRANSPORT_SKIP_RESPONSE);
291: skipResponseProperty.setValue("true");
292: props.add(skipResponseProperty);
293: octx
294: .put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS,
295: requestHeader);
296: octx.put(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT, new Long(10));
297:
298: }
299:
300: public void setupOneWayCallbackObject(
301: final boolean useAutomaticWorkQueue) {
302: callback1 = new ServerTransportCallback() {
303: public void dispatch(InputStreamMessageContext ctx,
304: ServerTransport transport) {
305: try {
306: byte bytes[] = new byte[10000];
307: readBytes(bytes, ctx.getInputStream());
308:
309: JMSOutputStreamContext octx = (JMSOutputStreamContext) transport
310: .createOutputStreamContext(ctx);
311: octx.setOneWay(true);
312: transport.finalPrepareOutputStreamContext(octx);
313: serverRcvdInOneWayCall = new String(bytes);
314:
315: MessageContext replyCtx = new GenericMessageContext();
316: ctx.put("ObjectMessageContext.MESSAGE_INPUT",
317: Boolean.TRUE);
318: replyCtx.putAll(ctx);
319: replyCtx.put("ObjectMessageContext.MESSAGE_INPUT",
320: Boolean.TRUE);
321:
322: ((JMSServerTransport) transport).postDispatch(
323: replyCtx, octx);
324: octx.getOutputStream().close();
325: } catch (Exception ex) {
326: ex.printStackTrace();
327: }
328: }
329:
330: public Executor getExecutor() {
331: if (useAutomaticWorkQueue) {
332: if (wqm == null) {
333: wqm = new WorkQueueManagerImpl(bus);
334: }
335: return wqm.getAutomaticWorkQueue();
336: } else {
337: return null;
338: }
339:
340: }
341: };
342: }
343:
344: public void doOneWayTestJMSTranport(
345: final boolean useAutomaticWorkQueue, QName serviceName,
346: String portName, String jndiDestinationName)
347: throws Exception {
348:
349: String address = "jms:ConnectionFactory#" + jndiDestinationName;
350: URL wsdlUrl = getClass().getResource("/wsdl/jms_test.wsdl");
351: assertNotNull(wsdlUrl);
352:
353: createConfiguration(wsdlUrl, serviceName, portName);
354: TransportFactory factory = createTransportFactory();
355: setupOneWayCallbackObject(useAutomaticWorkQueue);
356:
357: ServerTransport server = createServerTransport(factory,
358: wsdlUrl, serviceName, portName, address);
359:
360: server.activate(callback1);
361:
362: ClientTransport client = createClientTransport(factory,
363: wsdlUrl, serviceName, portName);
364:
365: assertTrue("targetEndpoint address mismatch. Expected : "
366: + address + " received : "
367: + client.getTargetEndpoint(), address.equals(client
368: .getTargetEndpoint().getAddress().getValue()));
369: OutputStreamMessageContext octx = client
370: .createOutputStreamContext(new GenericMessageContext());
371: client.finalPrepareOutputStreamContext(octx);
372: byte outBytes[] = "Hello World!!!".getBytes();
373: octx.getOutputStream().write(outBytes);
374: client.invokeOneway(octx);
375: Thread.sleep(500L);
376: assertEquals(new String(outBytes), serverRcvdInOneWayCall
377: .substring(0, outBytes.length));
378:
379: server.shutdown();
380: client.shutdown();
381: }
382:
383: private TransportFactory createTransportFactory()
384: throws BusException {
385: String transportId = "http://celtix.objectweb.org/transports/jms";
386: ObjectFactory of = new ObjectFactory();
387: ClassNamespaceMappingListType mappings = of
388: .createClassNamespaceMappingListType();
389: ClassNamespaceMappingType mapping = of
390: .createClassNamespaceMappingType();
391: mapping
392: .setClassname("org.objectweb.celtix.bus.transports.jms.JMSTransportFactory");
393: mapping.getNamespace().add(transportId);
394: mappings.getMap().add(mapping);
395: TransportFactoryManager tfm = new TransportFactoryManagerImpl(
396: bus);
397: return tfm.getTransportFactory(transportId);
398: }
399:
400: private ClientTransport createClientTransport(
401: TransportFactory factory, URL wsdlUrl, QName serviceName,
402: String portName) throws WSDLException, IOException {
403: EndpointReferenceType ref = EndpointReferenceUtils
404: .getEndpointReference(wsdlUrl, serviceName, portName);
405: ClientTransport transport = factory.createClientTransport(ref,
406: new TestClientBinding(bus, ref));
407:
408: return transport;
409: }
410:
411: private ServerTransport createServerTransport(
412: TransportFactory factory, URL wsdlUrl, QName serviceName,
413: String portName, String address) throws WSDLException,
414: IOException {
415:
416: EndpointReferenceType ref = EndpointReferenceUtils
417: .getEndpointReference(wsdlUrl, serviceName, portName);
418: EndpointReferenceUtils.setAddress(ref, address);
419: return factory.createServerTransport(ref);
420: }
421:
422: // Create bus and populate the configuration for Endpoint, Service and port.
423: // This test uses all the info. either coming from WSDL or default and do not use
424: // any configuration files.
425: //
426:
427: private void createConfiguration(URL wsdlUrl, QName serviceName,
428: String portName) throws WSDLException, IOException,
429: BusException {
430: assert bus != null;
431: EndpointReferenceType ref = EndpointReferenceUtils
432: .getEndpointReference(wsdlUrl, serviceName, portName);
433: Configuration busCfg = bus.getConfiguration();
434: assert null != busCfg;
435: String id = EndpointReferenceUtils.getServiceName(ref)
436: .toString();
437: ConfigurationBuilder cb = ConfigurationBuilderFactory
438: .getBuilder(null);
439: cb.buildConfiguration(JMSConstants.ENDPOINT_CONFIGURATION_URI,
440: id, busCfg);
441: cb.buildConfiguration(JMSConstants.PORT_CONFIGURATION_URI, id
442: + "/"
443: + EndpointReferenceUtils.getPortName(ref).toString(),
444: busCfg);
445: }
446: }
|