001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.systest.ws.rm;
019:
020: import java.math.BigInteger;
021: import java.util.logging.Logger;
022:
023: import javax.xml.ws.Response;
024:
025: import org.apache.cxf.Bus;
026: import org.apache.cxf.BusFactory;
027: import org.apache.cxf.bus.spring.SpringBusFactory;
028: import org.apache.cxf.endpoint.Client;
029: import org.apache.cxf.frontend.ClientProxy;
030: import org.apache.cxf.greeter_control.Control;
031: import org.apache.cxf.greeter_control.ControlService;
032: import org.apache.cxf.greeter_control.Greeter;
033: import org.apache.cxf.greeter_control.GreeterService;
034: import org.apache.cxf.greeter_control.types.GreetMeResponse;
035: import org.apache.cxf.systest.ws.util.ConnectionHelper;
036: import org.apache.cxf.systest.ws.util.InMessageRecorder;
037: import org.apache.cxf.systest.ws.util.MessageFlow;
038: import org.apache.cxf.systest.ws.util.MessageRecorder;
039: import org.apache.cxf.systest.ws.util.OutMessageRecorder;
040: import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
041: import org.apache.cxf.transport.http.HTTPConduit;
042: import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
043: import org.apache.cxf.ws.rm.RMConstants;
044: import org.apache.cxf.ws.rm.RMManager;
045: import org.apache.cxf.ws.rm.persistence.jdbc.RMTxStore;
046: import org.junit.BeforeClass;
047: import org.junit.Test;
048:
049: /**
050: * Tests the addition of WS-RM properties to application messages and the
051: * exchange of WS-RM protocol messages.
052: */
053: public class ServerPersistenceTest extends
054: AbstractBusClientServerTestBase {
055:
056: public static final String GREETMEONEWAY_ACTION = null;
057: public static final String GREETME_ACTION = null;
058: private static final Logger LOG = Logger
059: .getLogger(ServerPersistenceTest.class.getName());
060: private static final String CFG = "/org/apache/cxf/systest/ws/rm/persistent.xml";
061: private static final String SERVER_LOSS_CFG = "/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml";
062:
063: private OutMessageRecorder out;
064: private InMessageRecorder in;
065:
066: @BeforeClass
067: public static void startServers() throws Exception {
068: RMTxStore.deleteDatabaseFiles();
069: String derbyHome = System.getProperty("derby.system.home");
070: try {
071: System.setProperty("derby.system.home", derbyHome
072: + "-server");
073: RMTxStore.deleteDatabaseFiles();
074: } finally {
075: if (derbyHome != null) {
076: System.setProperty("derby.system.home", derbyHome);
077: } else {
078: System.clearProperty("derby.system.home");
079: }
080: }
081:
082: // run server in process to avoid a problem with UUID generation
083: // during asynchronous invocations
084:
085: boolean inProcess = "Windows 2000".equals(System
086: .getProperty("os.name"));
087: assertTrue("server did not launch correctly", launchServer(
088: Server.class, inProcess));
089: }
090:
091: @Test
092: public void testRecovery() throws Exception {
093: SpringBusFactory bf = new SpringBusFactory();
094: bus = bf.createBus();
095: BusFactory.setDefaultBus(bus);
096: LOG.fine("Created bus " + bus + " with default cfg");
097: ControlService cs = new ControlService();
098: Control control = cs.getControlPort();
099:
100: assertTrue("Failed to start greeter", control
101: .startGreeter(SERVER_LOSS_CFG));
102: LOG.fine("Started greeter server.");
103:
104: Bus greeterBus = new SpringBusFactory().createBus(CFG);
105: LOG.fine("Created bus " + greeterBus + " with cfg : " + CFG);
106: BusFactory.setDefaultBus(greeterBus);
107:
108: // avoid early client resends
109: greeterBus.getExtension(RMManager.class).getRMAssertion()
110: .getBaseRetransmissionInterval().setMilliseconds(
111: new BigInteger("60000"));
112: GreeterService gs = new GreeterService();
113: Greeter greeter = gs.getGreeterPort();
114:
115: LOG.fine("Created greeter client.");
116:
117: ConnectionHelper.setKeepAliveConnection(greeter, true);
118:
119: Client c = ClientProxy.getClient(greeter);
120: HTTPConduit hc = (HTTPConduit) (c.getConduit());
121: HTTPClientPolicy cp = hc.getClient();
122: cp
123: .setDecoupledEndpoint("http://localhost:9994/decoupled_endpoint");
124:
125: out = new OutMessageRecorder();
126: in = new InMessageRecorder();
127:
128: greeterBus.getOutInterceptors().add(out);
129: greeterBus.getInInterceptors().add(in);
130:
131: LOG.fine("Configured greeter client.");
132:
133: Response<GreetMeResponse> responses[] = cast(new Response[3]);
134:
135: responses[0] = greeter.greetMeAsync("one");
136: responses[1] = greeter.greetMeAsync("two");
137: responses[2] = greeter.greetMeAsync("three");
138:
139: verifyMissingResponse(responses);
140: control.stopGreeter(SERVER_LOSS_CFG);
141: LOG.fine("Stopped greeter server");
142:
143: out.getOutboundMessages().clear();
144: in.getInboundMessages().clear();
145:
146: control.startGreeter(CFG);
147: String nl = System.getProperty("line.separator");
148: LOG.fine("Restarted greeter server" + nl + nl);
149:
150: verifyServerRecovery(responses);
151:
152: greeterBus.shutdown(true);
153:
154: control.stopGreeter(CFG);
155: bus.shutdown(true);
156: }
157:
158: void verifyMissingResponse(Response<GreetMeResponse> responses[])
159: throws Exception {
160: awaitMessages(4, 7, 20000);
161:
162: // wait another while to prove that response to second request is indeed lost
163: Thread.sleep(1000);
164: int nDone = 0;
165: for (int i = 0; i < responses.length; i++) {
166: if (responses[i].isDone()) {
167: nDone++;
168: }
169: }
170:
171: assertEquals(
172: "Unexpected number of responses already received.", 2,
173: nDone);
174:
175: MessageFlow mf = new MessageFlow(out.getOutboundMessages(), in
176: .getInboundMessages());
177: String[] expectedActions = new String[] {
178: RMConstants.getCreateSequenceAction(), GREETME_ACTION,
179: GREETME_ACTION, GREETME_ACTION };
180: mf.verifyActions(expectedActions, true);
181: // mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
182: // mf.verifyAcknowledgements(new boolean[] {false, false, true, false}, true);
183:
184: mf.verifyPartialResponses(4);
185: mf.purgePartialResponses();
186: expectedActions = new String[] {
187: RMConstants.getCreateSequenceResponseAction(),
188: GREETME_ACTION, GREETME_ACTION };
189: mf.verifyActions(expectedActions, false);
190: // mf.verifyMessageNumbers(new String[] {null, "1", "3"}, false);
191: // mf.verifyAcknowledgements(new boolean[] {false, true, true}, false);
192: }
193:
194: void verifyServerRecovery(Response<GreetMeResponse> responses[])
195: throws Exception {
196:
197: // wait until all messages have received their responses
198: int nDone = 0;
199: long waited = 0;
200: while (waited < 5000) {
201: for (int i = 0; i < responses.length; i++) {
202: if (responses[i].isDone()) {
203: nDone++;
204: }
205: }
206: if (nDone == 3) {
207: break;
208: }
209: Thread.sleep(500);
210: nDone = 0;
211: }
212:
213: assertEquals("Not all responses have been received.", 3, nDone);
214:
215: // verify that all inbound messages are resent responses
216:
217: synchronized (this ) {
218: MessageFlow mf = new MessageFlow(out.getOutboundMessages(),
219: in.getInboundMessages());
220: int nOut = out.getOutboundMessages().size();
221: int nIn = in.getInboundMessages().size();
222: assertEquals("Unexpected outbound message(s)", 0, nOut);
223: assertTrue(nIn >= 1);
224: String[] expectedActions = new String[nIn];
225: for (int i = 0; i < nIn; i++) {
226: expectedActions[i] = GREETME_ACTION;
227: }
228: mf.verifyActions(expectedActions, false);
229: }
230: }
231:
232: protected void awaitMessages(int nExpectedOut, int nExpectedIn) {
233: awaitMessages(nExpectedOut, nExpectedIn, 10000);
234: }
235:
236: private void awaitMessages(int nExpectedOut, int nExpectedIn,
237: int timeout) {
238: MessageRecorder mr = new MessageRecorder(out, in);
239: mr.awaitMessages(nExpectedOut, nExpectedIn, timeout);
240: }
241:
242: @SuppressWarnings("unchecked")
243: <T> Response<T>[] cast(Response[] val) {
244: return (Response<T>[]) val;
245: }
246:
247: }
|