001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.transport.jms;
019:
020: import java.io.ByteArrayInputStream;
021: import java.io.IOException;
022: import java.io.InputStream;
023:
024: import org.apache.cxf.BusFactory;
025: import org.apache.cxf.bus.spring.SpringBusFactory;
026: import org.apache.cxf.message.Exchange;
027: import org.apache.cxf.message.ExchangeImpl;
028: import org.apache.cxf.message.Message;
029: import org.apache.cxf.message.MessageImpl;
030: import org.apache.cxf.transport.Conduit;
031: import org.apache.cxf.transport.ConduitInitiator;
032: import org.apache.cxf.transport.MessageObserver;
033: import org.apache.cxf.transport.MultiplexDestination;
034: import org.easymock.classextension.EasyMock;
035: import org.junit.BeforeClass;
036: import org.junit.Test;
037:
038: public class JMSDestinationTest extends AbstractJMSTester {
039: private Message destMessage;
040:
041: @BeforeClass
042: public static void createAndStartBroker() throws Exception {
043: startBroker(new JMSBrokerSetup("tcp://localhost:61500"));
044: }
045:
046: private void waitForReceiveInMessage() {
047: int waitTime = 0;
048: while (inMessage == null && waitTime < 3000) {
049: try {
050: Thread.sleep(1000);
051: } catch (InterruptedException e) {
052: // do nothing here
053: }
054: waitTime = waitTime + 1000;
055: }
056: assertTrue("Can't receive the Conduit Message in 3 seconds",
057: inMessage != null);
058: }
059:
060: private void waitForReceiveDestMessage() {
061: int waitTime = 0;
062: while (destMessage == null && waitTime < 3000) {
063: try {
064: Thread.sleep(1000);
065: } catch (InterruptedException e) {
066: // do nothing here
067: }
068: waitTime = waitTime + 1000;
069: }
070: assertTrue(
071: "Can't receive the Destination message in 3 seconds",
072: destMessage != null);
073: }
074:
075: public JMSDestination setupJMSDestination(boolean send)
076: throws IOException {
077: ConduitInitiator conduitInitiator = EasyMock
078: .createMock(ConduitInitiator.class);
079: JMSDestination jmsDestination = new JMSDestination(bus,
080: conduitInitiator, endpointInfo);
081: if (send) {
082: // setMessageObserver
083: observer = new MessageObserver() {
084: public void onMessage(Message m) {
085: Exchange exchange = new ExchangeImpl();
086: exchange.setInMessage(m);
087: m.setExchange(exchange);
088: destMessage = m;
089: }
090: };
091: jmsDestination.setMessageObserver(observer);
092: }
093: return jmsDestination;
094: }
095:
096: @Test
097: public void testGetConfigurationFromSpring() throws Exception {
098: SpringBusFactory bf = new SpringBusFactory();
099: BusFactory.setDefaultBus(null);
100: bus = bf.createBus("/jms_test_config.xml");
101: BusFactory.setDefaultBus(bus);
102: setupServiceInfo("http://cxf.apache.org/jms_conf_test",
103: "/wsdl/jms_test_no_addr.wsdl",
104: "HelloWorldQueueBinMsgService",
105: "HelloWorldQueueBinMsgPort");
106: JMSDestination destination = setupJMSDestination(false);
107: assertEquals(
108: "Can't get the right ServerConfig's MessageTimeToLive ",
109: 500L, destination.getServerConfig()
110: .getMessageTimeToLive());
111: assertEquals("Can't get the right Server's MessageSelector",
112: "cxf_message_selector", destination.getRuntimePolicy()
113: .getMessageSelector());
114: assertEquals(
115: "Can't get the right SessionPoolConfig's LowWaterMark",
116: 10, destination.getSessionPool().getLowWaterMark());
117: assertEquals(
118: "Can't get the right AddressPolicy's ConnectionPassword",
119: "testPassword", destination.getJMSAddress()
120: .getConnectionPassword());
121: assertEquals("Can't get the right DurableSubscriberName",
122: "cxf_subscriber", destination.getRuntimePolicy()
123: .getDurableSubscriberName());
124: assertEquals("Can't get the right MessageSelectorName",
125: "cxf_message_selector", destination.getRuntimePolicy()
126: .getMessageSelector());
127: BusFactory.setDefaultBus(null);
128:
129: }
130:
131: @Test
132: public void testGetConfigurationFormWSDL() throws Exception {
133: SpringBusFactory bf = new SpringBusFactory();
134: BusFactory.setDefaultBus(null);
135: bus = bf.createBus();
136: BusFactory.setDefaultBus(bus);
137: setupServiceInfo("http://cxf.apache.org/hello_world_jms",
138: "/wsdl/jms_test.wsdl", "HelloWorldQueueBinMsgService",
139: "HelloWorldQueueBinMsgPort");
140:
141: JMSDestination destination = setupJMSDestination(false);
142:
143: assertEquals("Can't get the right DurableSubscriberName",
144: "CXF_subscriber", destination.getRuntimePolicy()
145: .getDurableSubscriberName());
146:
147: assertEquals(
148: "Can't get the right AddressPolicy's ConnectionPassword",
149: "dynamicQueues/test.jmstransport.binary", destination
150: .getJMSAddress().getJndiDestinationName());
151:
152: BusFactory.setDefaultBus(null);
153:
154: }
155:
156: @Test
157: public void testDurableSubscriber() throws Exception {
158: SpringBusFactory bf = new SpringBusFactory();
159: BusFactory.setDefaultBus(null);
160: bus = bf.createBus("/wsdl/jms_test_config.xml");
161: BusFactory.setDefaultBus(bus);
162: destMessage = null;
163: inMessage = null;
164: setupServiceInfo("http://cxf.apache.org/hello_world_jms",
165: "/wsdl/jms_test.wsdl", "HelloWorldPubSubService",
166: "HelloWorldPubSubPort");
167: JMSConduit conduit = setupJMSConduit(true, false);
168: Message outMessage = new MessageImpl();
169: setupMessageHeader(outMessage);
170: JMSDestination destination = null;
171: try {
172: destination = setupJMSDestination(true);
173: //destination.activate();
174: } catch (IOException e) {
175: assertFalse(
176: "The JMSDestination activate should not through exception ",
177: false);
178: e.printStackTrace();
179: }
180: sendoutMessage(conduit, outMessage, true);
181: // wait for the message to be get from the destination
182: waitForReceiveDestMessage();
183: // just verify the Destination inMessage
184: assertTrue("The destiantion should have got the message ",
185: destMessage != null);
186: verifyReceivedMessage(destMessage);
187: verifyHeaders(destMessage, outMessage);
188: destination.shutdown();
189: }
190:
191: @Test
192: public void testOneWayDestination() throws Exception {
193: destMessage = null;
194: inMessage = null;
195: setupServiceInfo("http://cxf.apache.org/hello_world_jms",
196: "/wsdl/jms_test.wsdl", "HWStaticReplyQBinMsgService",
197: "HWStaticReplyQBinMsgPort");
198: JMSConduit conduit = setupJMSConduit(true, false);
199: Message outMessage = new MessageImpl();
200: setupMessageHeader(outMessage);
201: JMSDestination destination = null;
202: try {
203: destination = setupJMSDestination(true);
204: //destination.activate();
205: } catch (IOException e) {
206: assertFalse(
207: "The JMSDestination activate should not through exception ",
208: false);
209: e.printStackTrace();
210: }
211: sendoutMessage(conduit, outMessage, true);
212: // wait for the message to be get from the destination
213: waitForReceiveDestMessage();
214: // just verify the Destination inMessage
215: assertTrue("The destiantion should have got the message ",
216: destMessage != null);
217: verifyReceivedMessage(destMessage);
218: verifyHeaders(destMessage, outMessage);
219: destination.shutdown();
220: }
221:
222: private void setupMessageHeader(Message outMessage) {
223: JMSMessageHeadersType header = new JMSMessageHeadersType();
224: header.setJMSCorrelationID("Destination test");
225: header.setJMSDeliveryMode(3);
226: header.setJMSPriority(1);
227: header.setTimeToLive(1000);
228: outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
229: }
230:
231: private void verifyReceivedMessage(Message inMessage) {
232: ByteArrayInputStream bis = (ByteArrayInputStream) inMessage
233: .getContent(InputStream.class);
234: byte bytes[] = new byte[bis.available()];
235: try {
236: bis.read(bytes);
237: } catch (IOException ex) {
238: assertFalse("Read the Destination recieved Message error ",
239: false);
240: ex.printStackTrace();
241: }
242: String reponse = new String(bytes);
243: assertEquals("The reponse date should be equals", reponse,
244: "HelloWorld");
245: }
246:
247: private void verifyRequestResponseHeaders(Message inMessage,
248: Message outMessage) {
249: JMSMessageHeadersType outHeader = (JMSMessageHeadersType) outMessage
250: .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
251:
252: JMSMessageHeadersType inHeader = (JMSMessageHeadersType) inMessage
253: .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
254:
255: verifyJmsHeaderEquality(outHeader, inHeader);
256:
257: }
258:
259: private void verifyHeaders(Message inMessage, Message outMessage) {
260: JMSMessageHeadersType outHeader = (JMSMessageHeadersType) outMessage
261: .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
262:
263: JMSMessageHeadersType inHeader = (JMSMessageHeadersType) inMessage
264: .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
265:
266: verifyJmsHeaderEquality(outHeader, inHeader);
267:
268: }
269:
270: private void verifyJmsHeaderEquality(
271: JMSMessageHeadersType outHeader,
272: JMSMessageHeadersType inHeader) {
273: assertEquals(
274: "The inMessage and outMessage JMS Header's CorrelationID should be equals",
275: outHeader.getJMSCorrelationID(), inHeader
276: .getJMSCorrelationID());
277: assertEquals(
278: "The inMessage and outMessage JMS Header's JMSPriority should be equals",
279: outHeader.getJMSPriority(), inHeader.getJMSPriority());
280: assertEquals(
281: "The inMessage and outMessage JMS Header's JMSType should be equals",
282: outHeader.getJMSType(), inHeader.getJMSType());
283:
284: }
285:
286: @Test
287: public void testRoundTripDestination() throws Exception {
288:
289: inMessage = null;
290: setupServiceInfo("http://cxf.apache.org/hello_world_jms",
291: "/wsdl/jms_test.wsdl", "HelloWorldService",
292: "HelloWorldPort");
293: //set up the conduit send to be true
294: JMSConduit conduit = setupJMSConduit(true, false);
295: final Message outMessage = new MessageImpl();
296: setupMessageHeader(outMessage);
297: final JMSDestination destination = setupJMSDestination(true);
298:
299: //set up MessageObserver for handlering the conduit message
300: MessageObserver observer = new MessageObserver() {
301: public void onMessage(Message m) {
302: Exchange exchange = new ExchangeImpl();
303: exchange.setInMessage(m);
304: m.setExchange(exchange);
305: verifyReceivedMessage(m);
306: verifyHeaders(m, outMessage);
307: //setup the message for
308: Conduit backConduit;
309: try {
310: backConduit = destination.getBackChannel(m, null,
311: null);
312: //wait for the message to be got from the conduit
313: Message replyMessage = new MessageImpl();
314: sendoutMessage(backConduit, replyMessage, true);
315: } catch (Exception e) {
316: // TODO Auto-generated catch block
317: e.printStackTrace();
318: }
319: }
320: };
321: destination.setMessageObserver(observer);
322: //set is oneway false for get response from destination
323: sendoutMessage(conduit, outMessage, false);
324: //wait for the message to be got from the destination,
325: // create the thread to handler the Destination incomming message
326:
327: waitForReceiveInMessage();
328: verifyReceivedMessage(inMessage);
329: // wait for a while for the jms session recycling
330: Thread.sleep(1000);
331: destination.shutdown();
332: }
333:
334: @Test
335: public void testPropertyExclusion() throws Exception {
336:
337: final String customPropertyName = "THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED";
338:
339: inMessage = null;
340: setupServiceInfo("http://cxf.apache.org/hello_world_jms",
341: "/wsdl/jms_test.wsdl", "HelloWorldService",
342: "HelloWorldPort");
343: //set up the conduit send to be true
344: JMSConduit conduit = setupJMSConduit(true, false);
345: final Message outMessage = new MessageImpl();
346: setupMessageHeader(outMessage);
347:
348: JMSPropertyType excludeProp = new JMSPropertyType();
349: excludeProp.setName(customPropertyName);
350: excludeProp.setValue(customPropertyName);
351:
352: JMSMessageHeadersType headers = (JMSMessageHeadersType) outMessage
353: .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
354: headers.getProperty().add(excludeProp);
355:
356: final JMSDestination destination = setupJMSDestination(true);
357:
358: //set up MessageObserver for handlering the conduit message
359: MessageObserver observer = new MessageObserver() {
360: public void onMessage(Message m) {
361: Exchange exchange = new ExchangeImpl();
362: exchange.setInMessage(m);
363: m.setExchange(exchange);
364: verifyReceivedMessage(m);
365: verifyHeaders(m, outMessage);
366: //setup the message for
367: Conduit backConduit;
368: try {
369: backConduit = destination.getBackChannel(m, null,
370: null);
371: //wait for the message to be got from the conduit
372: Message replyMessage = new MessageImpl();
373: sendoutMessage(backConduit, replyMessage, true);
374: } catch (Exception e) {
375: // TODO Auto-generated catch block
376: e.printStackTrace();
377: }
378: }
379: };
380: destination.setMessageObserver(observer);
381: //set is oneway false for get response from destination
382: sendoutMessage(conduit, outMessage, false);
383: //wait for the message to be got from the destination,
384: // create the thread to handler the Destination incomming message
385:
386: waitForReceiveInMessage();
387: verifyReceivedMessage(inMessage);
388:
389: verifyRequestResponseHeaders(inMessage, outMessage);
390:
391: JMSMessageHeadersType inHeader = (JMSMessageHeadersType) inMessage
392: .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
393: assertTrue("property has been excluded", inHeader.getProperty()
394: .isEmpty());
395:
396: // wait for a while for the jms session recycling
397: Thread.sleep(1000);
398: destination.shutdown();
399: }
400:
401: @Test
402: public void testIsMultiplexCapable() throws Exception {
403: inMessage = null;
404: setupServiceInfo("http://cxf.apache.org/hello_world_jms",
405: "/wsdl/jms_test.wsdl", "HelloWorldService",
406: "HelloWorldPort");
407: final JMSDestination destination = setupJMSDestination(true);
408: assertTrue("is multiplex",
409: destination instanceof MultiplexDestination);
410: }
411: }
|