001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.systest.ws.rm;
019:
020: import java.math.BigInteger;
021: import java.util.Collection;
022: import java.util.logging.Logger;
023:
024: import javax.xml.ws.Endpoint;
025:
026: import org.apache.cxf.Bus;
027: import org.apache.cxf.BusFactory;
028: import org.apache.cxf.bus.spring.SpringBusFactory;
029: import org.apache.cxf.endpoint.Client;
030: import org.apache.cxf.frontend.ClientProxy;
031: import org.apache.cxf.greeter_control.Greeter;
032: import org.apache.cxf.greeter_control.GreeterService;
033: import org.apache.cxf.interceptor.LoggingInInterceptor;
034: import org.apache.cxf.interceptor.LoggingOutInterceptor;
035: import org.apache.cxf.systest.ws.policy.GreeterImpl;
036: import org.apache.cxf.systest.ws.util.InMessageRecorder;
037: import org.apache.cxf.systest.ws.util.MessageFlow;
038: import org.apache.cxf.systest.ws.util.MessageRecorder;
039: import org.apache.cxf.systest.ws.util.OutMessageRecorder;
040: import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
041: import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
042: import org.apache.cxf.ws.rm.DestinationSequence;
043: import org.apache.cxf.ws.rm.RMConstants;
044: import org.apache.cxf.ws.rm.RMManager;
045: import org.apache.cxf.ws.rm.RMUtils;
046: import org.apache.cxf.ws.rm.SourceSequence;
047: import org.apache.cxf.ws.rm.persistence.RMMessage;
048: import org.apache.cxf.ws.rm.persistence.RMStore;
049: import org.apache.cxf.ws.rm.persistence.jdbc.RMTxStore;
050: import org.junit.AfterClass;
051: import org.junit.BeforeClass;
052: import org.junit.Test;
053:
054: /**
055: * Tests the addition of WS-RM properties to application messages and the
056: * exchange of WS-RM protocol messages.
057: */
058: public class ClientPersistenceTest extends
059: AbstractBusClientServerTestBase {
060:
061: public static final String GREETMEONEWAY_ACTION = null;
062: public static final String GREETME_ACTION = null;
063: private static final Logger LOG = Logger
064: .getLogger(ClientPersistenceTest.class.getName());
065:
066: private Greeter greeter;
067: private OutMessageRecorder out;
068: private InMessageRecorder in;
069:
070: public static class Server extends AbstractBusTestServerBase {
071:
072: protected void run() {
073: SpringBusFactory bf = new SpringBusFactory();
074: Bus bus = bf
075: .createBus("/org/apache/cxf/systest/ws/rm/persistent.xml");
076: BusFactory.setDefaultBus(bus);
077:
078: LoggingInInterceptor logIn = new LoggingInInterceptor();
079: bus.getInInterceptors().add(logIn);
080: LoggingOutInterceptor logOut = new LoggingOutInterceptor();
081: bus.getOutFaultInterceptors().add(logOut);
082: bus.getOutFaultInterceptors().add(logOut);
083:
084: bus.getExtension(RMManager.class).getRMAssertion()
085: .getBaseRetransmissionInterval().setMilliseconds(
086: new BigInteger("60000"));
087:
088: GreeterImpl implementor = new GreeterImpl();
089: String address = "http://localhost:9020/SoapContext/GreeterPort";
090: Endpoint.publish(address, implementor);
091: LOG.info("Published greeter endpoint.");
092: }
093:
094: public static void main(String[] args) {
095: try {
096: RMTxStore.deleteDatabaseFiles();
097: Server s = new Server();
098: s.start();
099: } catch (Exception ex) {
100: ex.printStackTrace();
101: System.exit(-1);
102: } finally {
103: System.out.println("done!");
104: }
105: }
106: }
107:
108: @BeforeClass
109: public static void startServers() throws Exception {
110: String derbyHome = System.getProperty("derby.system.home");
111: try {
112: System.setProperty("derby.system.home", derbyHome
113: + "-server");
114: assertTrue("server did not launch correctly",
115: launchServer(Server.class));
116: } finally {
117: System.setProperty("derby.system.home", derbyHome);
118: }
119: RMTxStore.deleteDatabaseFiles();
120: }
121:
122: @AfterClass
123: public static void tearDownOnce() {
124: RMTxStore.deleteDatabaseFiles(RMTxStore.DEFAULT_DATABASE_NAME,
125: false);
126: }
127:
128: @Test
129: public void testRecovery() throws Exception {
130: startClient();
131: populateStore();
132: verifyStorePopulation();
133: stopClient();
134: startClient();
135: recover();
136: verifyRecovery();
137: }
138:
139: void startClient() {
140: LOG.fine("Creating greeter client");
141: SpringBusFactory bf = new SpringBusFactory();
142: bus = bf
143: .createBus("/org/apache/cxf/systest/ws/rm/persistent.xml");
144: BusFactory.setDefaultBus(bus);
145:
146: GreeterService gs = new GreeterService();
147: greeter = gs.getGreeterPort();
148:
149: out = new OutMessageRecorder();
150: in = new InMessageRecorder();
151:
152: bus.getOutInterceptors().add(out);
153: bus.getInInterceptors().add(in);
154: }
155:
156: void populateStore() throws Exception {
157:
158: bus.getExtension(RMManager.class).getRMAssertion()
159: .getBaseRetransmissionInterval().setMilliseconds(
160: new BigInteger("60000"));
161: bus.getOutInterceptors().add(new MessageLossSimulator());
162:
163: greeter.greetMeOneWay("one");
164: greeter.greetMeOneWay("two");
165: greeter.greetMeOneWay("three");
166: greeter.greetMeOneWay("four");
167:
168: MessageFlow mf = new MessageFlow(out.getOutboundMessages(), in
169: .getInboundMessages());
170:
171: assertNotNull(mf);
172: awaitMessages(5, 3);
173:
174: mf.verifyMessages(5, true);
175: String[] expectedActions = new String[] {
176: RMConstants.getCreateSequenceAction(),
177: GREETMEONEWAY_ACTION, GREETMEONEWAY_ACTION,
178: GREETMEONEWAY_ACTION, GREETMEONEWAY_ACTION };
179: mf.verifyActions(expectedActions, true);
180: mf.verifyMessageNumbers(
181: new String[] { null, "1", "2", "3", "4" }, true);
182: mf.verifyAcknowledgements(new boolean[5], true);
183:
184: mf.verifyMessages(3, false);
185: expectedActions = new String[] {
186: RMConstants.getCreateSequenceResponseAction(),
187: RMConstants.getSequenceAcknowledgmentAction(),
188: RMConstants.getSequenceAcknowledgmentAction() };
189: mf.verifyActions(expectedActions, false);
190: mf.verifyAcknowledgements(new boolean[] { false, true, true },
191: false);
192: }
193:
194: void verifyStorePopulation() {
195:
196: RMManager manager = bus.getExtension(RMManager.class);
197: assertNotNull(manager);
198:
199: RMStore store = manager.getStore();
200: assertNotNull(store);
201:
202: Client client = ClientProxy.getClient(greeter);
203: String id = RMUtils.getEndpointIdentifier(client.getEndpoint());
204:
205: Collection<DestinationSequence> dss = store
206: .getDestinationSequences(id);
207: assertEquals(1, dss.size());
208:
209: Collection<SourceSequence> sss = store.getSourceSequences(id);
210: assertEquals(1, sss.size());
211:
212: Collection<RMMessage> msgs = store.getMessages(sss.iterator()
213: .next().getIdentifier(), true);
214: assertEquals(2, msgs.size());
215:
216: msgs = store.getMessages(sss.iterator().next().getIdentifier(),
217: false);
218: assertEquals(0, msgs.size());
219: }
220:
221: void stopClient() {
222: // ClientProxy.getClient(greeter).destroy();
223: bus.shutdown(true);
224: }
225:
226: void recover() throws Exception {
227:
228: // do nothing - resends should happen in the background
229:
230: Thread.sleep(5000);
231: LOG.info("Recovered messages should have been resent by now.");
232:
233: }
234:
235: void verifyRecovery() throws Exception {
236:
237: RMManager manager = bus.getExtension(RMManager.class);
238: assertNotNull(manager);
239:
240: RMStore store = manager.getStore();
241: assertNotNull(store);
242:
243: Client client = ClientProxy.getClient(greeter);
244: String id = RMUtils.getEndpointIdentifier(client.getEndpoint());
245:
246: Collection<DestinationSequence> dss = store
247: .getDestinationSequences(id);
248: assertEquals(1, dss.size());
249:
250: Collection<SourceSequence> sss = store.getSourceSequences(id);
251: assertEquals(1, sss.size());
252:
253: int i = 0;
254: while (store.getMessages(sss.iterator().next().getIdentifier(),
255: true).size() > 0
256: && i < 10) {
257: Thread.sleep(200);
258: i++;
259: }
260:
261: assertEquals(0, store.getMessages(
262: sss.iterator().next().getIdentifier(), true).size());
263: assertEquals(0, store.getMessages(
264: sss.iterator().next().getIdentifier(), false).size());
265: }
266:
267: private void awaitMessages(int nExpectedOut, int nExpectedIn) {
268: awaitMessages(nExpectedOut, nExpectedIn, 10000);
269: }
270:
271: private void awaitMessages(int nExpectedOut, int nExpectedIn,
272: int timeout) {
273: MessageRecorder mr = new MessageRecorder(out, in);
274: mr.awaitMessages(nExpectedOut, nExpectedIn, timeout);
275: }
276:
277: }
|