001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jbi.nmr.flow.jca;
018:
019: import javax.jbi.JBIException;
020:
021: import junit.framework.TestCase;
022:
023: import org.apache.activemq.broker.BrokerService;
024: import org.apache.activemq.xbean.BrokerFactoryBean;
025: import org.apache.servicemix.jbi.RuntimeJBIException;
026: import org.apache.servicemix.jbi.container.ActivationSpec;
027: import org.apache.servicemix.jbi.container.JBIContainer;
028: import org.apache.servicemix.jbi.resolver.ServiceNameEndpointResolver;
029: import org.apache.servicemix.tck.ReceiverComponent;
030: import org.apache.servicemix.tck.SenderComponent;
031: import org.jencks.GeronimoPlatformTransactionManager;
032: import org.springframework.core.io.ClassPathResource;
033: import org.springframework.transaction.TransactionStatus;
034: import org.springframework.transaction.support.TransactionCallback;
035: import org.springframework.transaction.support.TransactionTemplate;
036:
037: /**
038: * @version $Revision: 564607 $
039: */
040: public class JCAFlowTest extends TestCase {
041:
042: private static final int NUM_MESSAGES = 10;
043:
044: private JBIContainer senderContainer = new JBIContainer();
045:
046: private JBIContainer receiverContainer = new JBIContainer();
047:
048: private BrokerService broker;
049:
050: private TransactionTemplate tt;
051:
052: /*
053: * @see TestCase#setUp()
054: */
055: protected void setUp() throws Exception {
056: super .setUp();
057:
058: GeronimoPlatformTransactionManager tm = new GeronimoPlatformTransactionManager();
059: tt = new TransactionTemplate(tm);
060:
061: BrokerFactoryBean bfb = new BrokerFactoryBean(
062: new ClassPathResource(
063: "org/apache/servicemix/jbi/nmr/flow/jca/broker.xml"));
064: bfb.afterPropertiesSet();
065: broker = bfb.getBroker();
066: broker.start();
067:
068: JCAFlow senderFlow = new JCAFlow();
069: senderFlow.setJmsURL("tcp://localhost:61216");
070: senderContainer.setTransactionManager(tm);
071: senderContainer.setEmbedded(true);
072: senderContainer.setName("senderContainer");
073: senderContainer.setFlow(senderFlow);
074: senderContainer.setMonitorInstallationDirectory(false);
075: senderContainer.init();
076: senderContainer.start();
077:
078: JCAFlow receiverFlow = new JCAFlow();
079: receiverFlow.setJmsURL("tcp://localhost:61216");
080: receiverContainer.setTransactionManager(tm);
081: receiverContainer.setEmbedded(true);
082: receiverContainer.setName("receiverContainer");
083: receiverContainer.setFlow(receiverFlow);
084: receiverContainer.setMonitorInstallationDirectory(false);
085: receiverContainer.init();
086: receiverContainer.start();
087: }
088:
089: protected void tearDown() throws Exception {
090: super .tearDown();
091: senderContainer.shutDown();
092: receiverContainer.shutDown();
093: broker.stop();
094: }
095:
096: public void testInOnly() throws Exception {
097: final SenderComponent sender = new SenderComponent();
098: final ReceiverComponent receiver = new ReceiverComponent();
099: sender.setResolver(new ServiceNameEndpointResolver(
100: ReceiverComponent.SERVICE));
101:
102: senderContainer.activateComponent(new ActivationSpec("sender",
103: sender));
104: receiverContainer.activateComponent(new ActivationSpec(
105: "receiver", receiver));
106:
107: Thread.sleep(5000);
108:
109: sender.sendMessages(NUM_MESSAGES);
110: Thread.sleep(3000);
111: receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
112: }
113:
114: public void testTxInOnly() throws Exception {
115: final SenderComponent sender = new SenderComponent();
116: final ReceiverComponent receiver = new ReceiverComponent();
117: sender.setResolver(new ServiceNameEndpointResolver(
118: ReceiverComponent.SERVICE));
119:
120: senderContainer.activateComponent(new ActivationSpec("sender",
121: sender));
122: receiverContainer.activateComponent(new ActivationSpec(
123: "receiver", receiver));
124:
125: Thread.sleep(5000);
126:
127: senderContainer.setAutoEnlistInTransaction(true);
128: tt.execute(new TransactionCallback() {
129: public Object doInTransaction(TransactionStatus status) {
130: try {
131: sender.sendMessages(NUM_MESSAGES);
132: } catch (JBIException e) {
133: throw new RuntimeJBIException(e);
134: }
135: return null;
136: }
137: });
138: Thread.sleep(3000);
139: receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
140: }
141:
142: public void testClusteredInOnly() throws Exception {
143: final SenderComponent sender = new SenderComponent();
144: final ReceiverComponent receiver1 = new ReceiverComponent();
145: final ReceiverComponent receiver2 = new ReceiverComponent();
146: sender.setResolver(new ServiceNameEndpointResolver(
147: ReceiverComponent.SERVICE));
148:
149: senderContainer.activateComponent(new ActivationSpec("sender",
150: sender));
151: senderContainer.activateComponent(new ActivationSpec(
152: "receiver", receiver1));
153: receiverContainer.activateComponent(new ActivationSpec(
154: "receiver", receiver2));
155: Thread.sleep(1000);
156:
157: sender.sendMessages(NUM_MESSAGES);
158: Thread.sleep(3000);
159: assertTrue(receiver1.getMessageList().hasReceivedMessage());
160: assertTrue(receiver2.getMessageList().hasReceivedMessage());
161: receiver1.getMessageList().flushMessages();
162: receiver2.getMessageList().flushMessages();
163:
164: senderContainer.deactivateComponent("receiver");
165: Thread.sleep(1000);
166:
167: sender.sendMessages(NUM_MESSAGES);
168: Thread.sleep(3000);
169: assertFalse(receiver1.getMessageList().hasReceivedMessage());
170: assertTrue(receiver2.getMessageList().hasReceivedMessage());
171: receiver1.getMessageList().flushMessages();
172: receiver2.getMessageList().flushMessages();
173:
174: senderContainer.activateComponent(new ActivationSpec(
175: "receiver", receiver1));
176: receiverContainer.deactivateComponent("receiver");
177: Thread.sleep(1000);
178:
179: sender.sendMessages(NUM_MESSAGES);
180: Thread.sleep(3000);
181: assertTrue(receiver1.getMessageList().hasReceivedMessage());
182: assertFalse(receiver2.getMessageList().hasReceivedMessage());
183: receiver1.getMessageList().flushMessages();
184: receiver2.getMessageList().flushMessages();
185: }
186:
187: }
|