001: package org.objectweb.celtix.bus.ws.rm;
002:
003: import java.io.OutputStream;
004: import java.math.BigInteger;
005: import java.util.ArrayList;
006: import java.util.List;
007:
008: import javax.xml.ws.handler.MessageContext;
009:
010: import junit.framework.TestCase;
011:
012: import org.easymock.IMocksControl;
013: import org.easymock.classextension.EasyMock;
014: import org.objectweb.celtix.bindings.AbstractBindingBase;
015: import org.objectweb.celtix.bindings.AbstractBindingImpl;
016: import org.objectweb.celtix.bindings.DataBindingCallback;
017: import org.objectweb.celtix.bindings.ServerDataBindingCallback;
018: import org.objectweb.celtix.bus.ws.addressing.AddressingPropertiesImpl;
019: import org.objectweb.celtix.context.InputStreamMessageContext;
020: import org.objectweb.celtix.context.ObjectMessageContext;
021: import org.objectweb.celtix.context.OutputStreamMessageContext;
022: import org.objectweb.celtix.handlers.HandlerInvoker;
023: import org.objectweb.celtix.transports.ClientTransport;
024: import org.objectweb.celtix.transports.ServerTransport;
025: import org.objectweb.celtix.transports.Transport;
026: import org.objectweb.celtix.workqueue.WorkQueue;
027: import org.objectweb.celtix.ws.rm.Identifier;
028: import org.objectweb.celtix.ws.rm.RMProperties;
029: import org.objectweb.celtix.ws.rm.SequenceType;
030: import org.objectweb.celtix.ws.rm.persistence.RMStore;
031:
032: import static org.objectweb.celtix.bindings.JAXWSConstants.DATABINDING_CALLBACK_PROPERTY;
033: import static org.objectweb.celtix.context.ObjectMessageContext.REQUESTOR_ROLE_PROPERTY;
034: import static org.objectweb.celtix.ws.addressing.JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES_OUTBOUND;
035: import static org.objectweb.celtix.ws.addressing.JAXWSAConstants.SERVER_ADDRESSING_PROPERTIES_OUTBOUND;
036: import static org.objectweb.celtix.ws.rm.JAXWSRMConstants.RM_PROPERTIES_OUTBOUND;
037:
038: /**
039: * Test resend logic.
040: */
041: public class RetransmissionQueueTest extends TestCase {
042:
043: private IMocksControl control;
044: private RMHandler handler;
045: private WorkQueue workQueue;
046: private RetransmissionQueue queue;
047: private TestResender resender;
048: private List<ObjectMessageContext> contexts = new ArrayList<ObjectMessageContext>();
049: private List<RMProperties> properties = new ArrayList<RMProperties>();
050: private List<SequenceType> sequences = new ArrayList<SequenceType>();
051: private List<Identifier> identifiers = new ArrayList<Identifier>();
052:
053: public void setUp() {
054: control = EasyMock.createNiceControl();
055: handler = control.createMock(RMHandler.class);
056: queue = new RetransmissionQueue(handler);
057: resender = new TestResender();
058: queue.replaceResender(resender);
059: workQueue = control.createMock(WorkQueue.class);
060: /*
061: workQueue.schedule(queue.getResendInitiator(),
062: queue.getBaseRetransmissionInterval());
063: EasyMock.expectLastCall();
064: */
065: }
066:
067: public void tearDown() {
068: control.verify();
069: contexts.clear();
070: properties.clear();
071: sequences.clear();
072: }
073:
074: public void testCtor() {
075: ready();
076: assertNotNull("expected unacked map", queue.getUnacknowledged());
077: assertEquals("expected empty unacked map", 0, queue
078: .getUnacknowledged().size());
079: assertEquals("unexpected base retransmission interval", 3000L,
080: queue.getBaseRetransmissionInterval());
081: assertEquals("unexpected exponential backoff", 2, queue
082: .getExponentialBackoff());
083: }
084:
085: public void testCacheUnacknowledged() {
086: ObjectMessageContext context1 = setUpContext("sequence1");
087: ObjectMessageContext context2 = setUpContext("sequence2");
088: ObjectMessageContext context3 = setUpContext("sequence1");
089:
090: setupContextMAPs(context1);
091: setupContextMAPs(context2);
092: setupContextMAPs(context3);
093:
094: ready();
095:
096: assertNotNull("expected resend candidate", queue
097: .cacheUnacknowledged(context1));
098: assertEquals("expected non-empty unacked map", 1, queue
099: .getUnacknowledged().size());
100: List<RetransmissionQueue.ResendCandidate> sequence1List = queue
101: .getUnacknowledged().get("sequence1");
102: assertNotNull("expected non-null context list", sequence1List);
103: assertSame("expected context list entry", context1,
104: sequence1List.get(0).getContext());
105:
106: assertNotNull("expected resend candidate", queue
107: .cacheUnacknowledged(context2));
108: assertEquals("unexpected unacked map size", 2, queue
109: .getUnacknowledged().size());
110: List<RetransmissionQueue.ResendCandidate> sequence2List = queue
111: .getUnacknowledged().get("sequence2");
112: assertNotNull("expected non-null context list", sequence2List);
113: assertSame("expected context list entry", context2,
114: sequence2List.get(0).getContext());
115:
116: assertNotNull("expected resend candidate", queue
117: .cacheUnacknowledged(context3));
118: assertEquals("un expected unacked map size", 2, queue
119: .getUnacknowledged().size());
120: sequence1List = queue.getUnacknowledged().get("sequence1");
121: assertNotNull("expected non-null context list", sequence1List);
122: assertSame("expected context list entry", context3,
123: sequence1List.get(1).getContext());
124: }
125:
126: public void testPurgeAcknowledgedSome() {
127: BigInteger[] messageNumbers = { BigInteger.TEN, BigInteger.ONE };
128: SourceSequence sequence = setUpSequence("sequence1",
129: messageNumbers, new boolean[] { true, false });
130: List<RetransmissionQueue.ResendCandidate> sequenceList = new ArrayList<RetransmissionQueue.ResendCandidate>();
131: queue.getUnacknowledged().put("sequence1", sequenceList);
132: ObjectMessageContext context1 = setUpContext("sequence1",
133: messageNumbers[0]);
134: sequenceList.add(queue.createResendCandidate(context1));
135: ObjectMessageContext context2 = setUpContext("sequence1",
136: messageNumbers[1]);
137: sequenceList.add(queue.createResendCandidate(context2));
138: ready();
139:
140: queue.purgeAcknowledged(sequence);
141: assertEquals("unexpected unacked map size", 1, queue
142: .getUnacknowledged().size());
143: assertEquals("unexpected unacked list size", 1, sequenceList
144: .size());
145: }
146:
147: public void testPurgeAcknowledgedNone() {
148: BigInteger[] messageNumbers = { BigInteger.TEN, BigInteger.ONE };
149: SourceSequence sequence = setUpSequence("sequence1",
150: messageNumbers, new boolean[] { false, false });
151: List<RetransmissionQueue.ResendCandidate> sequenceList = new ArrayList<RetransmissionQueue.ResendCandidate>();
152: queue.getUnacknowledged().put("sequence1", sequenceList);
153: ObjectMessageContext context1 = setUpContext("sequence1",
154: messageNumbers[0]);
155: sequenceList.add(queue.createResendCandidate(context1));
156: ObjectMessageContext context2 = setUpContext("sequence1",
157: messageNumbers[1]);
158: sequenceList.add(queue.createResendCandidate(context2));
159: ready();
160:
161: queue.purgeAcknowledged(sequence);
162: assertEquals("unexpected unacked map size", 1, queue
163: .getUnacknowledged().size());
164: assertEquals("unexpected unacked list size", 2, sequenceList
165: .size());
166: }
167:
168: public void testCountUnacknowledged() {
169: BigInteger[] messageNumbers = { BigInteger.TEN, BigInteger.ONE };
170: SourceSequence sequence = setUpSequence("sequence1",
171: messageNumbers, null);
172: List<RetransmissionQueue.ResendCandidate> sequenceList = new ArrayList<RetransmissionQueue.ResendCandidate>();
173: queue.getUnacknowledged().put("sequence1", sequenceList);
174: ObjectMessageContext context1 = setUpContext("sequence1",
175: messageNumbers[0], false);
176: sequenceList.add(queue.createResendCandidate(context1));
177: ObjectMessageContext context2 = setUpContext("sequence1",
178: messageNumbers[1], false);
179: sequenceList.add(queue.createResendCandidate(context2));
180: ready();
181:
182: assertEquals("unexpected unacked count", 2, queue
183: .countUnacknowledged(sequence));
184: }
185:
186: public void testCountUnacknowledgedUnknownSequence() {
187: BigInteger[] messageNumbers = { BigInteger.TEN, BigInteger.ONE };
188: SourceSequence sequence = setUpSequence("sequence1",
189: messageNumbers, null);
190: ready();
191:
192: assertEquals("unexpected unacked count", 0, queue
193: .countUnacknowledged(sequence));
194: }
195:
196: public void testResendInitiatorBackoffLogic() {
197: ObjectMessageContext context1 = setUpContext("sequence1");
198: ObjectMessageContext context2 = setUpContext("sequence2");
199: ObjectMessageContext context3 = setUpContext("sequence1");
200: setupContextMAPs(context1);
201: setupContextMAPs(context2);
202: setupContextMAPs(context3);
203: ready();
204: RetransmissionQueue.ResendCandidate candidate1 = queue
205: .cacheUnacknowledged(context1);
206: RetransmissionQueue.ResendCandidate candidate2 = queue
207: .cacheUnacknowledged(context2);
208: RetransmissionQueue.ResendCandidate candidate3 = queue
209: .cacheUnacknowledged(context3);
210: RetransmissionQueue.ResendCandidate[] allCandidates = {
211: candidate1, candidate2, candidate3 };
212: boolean[] expectAckRequested = { true, true, false };
213:
214: // initial run => none due
215: runInitiator();
216:
217: // all 3 candidates due
218: runInitiator(allCandidates);
219: runCandidates(allCandidates, expectAckRequested);
220:
221: // exponential backoff => none due
222: runInitiator();
223:
224: // all 3 candidates due
225: runInitiator(allCandidates);
226: runCandidates(allCandidates, expectAckRequested);
227:
228: for (int i = 0; i < 3; i++) {
229: // exponential backoff => none due
230: runInitiator();
231: }
232:
233: // all 3 candidates due
234: runInitiator(allCandidates);
235: runCandidates(allCandidates, expectAckRequested);
236:
237: for (int i = 0; i < 7; i++) {
238: // exponential backoff => none due
239: runInitiator();
240: }
241:
242: // all 3 candidates due
243: runInitiator(allCandidates);
244: runCandidates(allCandidates, expectAckRequested);
245: }
246:
247: public void testResendInitiatorDueLogic() {
248: ObjectMessageContext context1 = setUpContext("sequence1");
249: ObjectMessageContext context2 = setUpContext("sequence2");
250: ObjectMessageContext context3 = setUpContext("sequence1");
251: setupContextMAPs(context1);
252: setupContextMAPs(context2);
253: setupContextMAPs(context3);
254: ready();
255: RetransmissionQueue.ResendCandidate candidate1 = queue
256: .cacheUnacknowledged(context1);
257: RetransmissionQueue.ResendCandidate candidate2 = queue
258: .cacheUnacknowledged(context2);
259: RetransmissionQueue.ResendCandidate candidate3 = queue
260: .cacheUnacknowledged(context3);
261: RetransmissionQueue.ResendCandidate[] allCandidates = {
262: candidate1, candidate2, candidate3 };
263: boolean[] expectAckRequested = { true, true, false };
264:
265: // initial run => none due
266: runInitiator();
267:
268: // all 3 candidates due
269: runInitiator(allCandidates);
270:
271: // all still pending => none due
272: runInitiator();
273:
274: candidate1.run();
275: candidate2.run();
276:
277: // exponential backoff => none due
278: runInitiator();
279:
280: // candidates 1 & 2 run => only these due
281: runInitiator(new RetransmissionQueue.ResendCandidate[] {
282: candidate1, candidate2 });
283:
284: runCandidates(allCandidates, expectAckRequested);
285:
286: // exponential backoff => none due
287: runInitiator();
288:
289: // candidates 3 run belatedly => now due
290: runInitiator(new RetransmissionQueue.ResendCandidate[] { candidate3 });
291:
292: // exponential backoff => none due
293: runInitiator();
294:
295: // candidates 1 & 2 now due
296: runInitiator(new RetransmissionQueue.ResendCandidate[] {
297: candidate1, candidate2 });
298: }
299:
300: public void testResendInitiatorResolvedLogic() {
301: ObjectMessageContext context1 = setUpContext("sequence1");
302: ObjectMessageContext context2 = setUpContext("sequence2");
303: ObjectMessageContext context3 = setUpContext("sequence1");
304: setupContextMAPs(context1);
305: setupContextMAPs(context2);
306: setupContextMAPs(context3);
307: ready();
308: RetransmissionQueue.ResendCandidate candidate1 = queue
309: .cacheUnacknowledged(context1);
310: RetransmissionQueue.ResendCandidate candidate2 = queue
311: .cacheUnacknowledged(context2);
312: RetransmissionQueue.ResendCandidate candidate3 = queue
313: .cacheUnacknowledged(context3);
314: RetransmissionQueue.ResendCandidate[] allCandidates = {
315: candidate1, candidate2, candidate3 };
316: boolean[] expectAckRequested = { true, true, false };
317:
318: // initial run => none due
319: runInitiator();
320:
321: // all 3 candidates due
322: runInitiator(allCandidates);
323: runCandidates(allCandidates, expectAckRequested);
324:
325: // exponential backoff => none due
326: runInitiator();
327:
328: candidate1.resolved();
329: candidate3.resolved();
330:
331: // candidates 1 & 3 resolved => only candidate2 due
332: runInitiator(new RetransmissionQueue.ResendCandidate[] { candidate2 });
333: }
334:
335: public void testResenderInitiatorReschedule() {
336: ready();
337:
338: runInitiator();
339: }
340:
341: public void testResenderInitiatorNoRescheduleOnShutdown() {
342: ready();
343:
344: queue.shutdown();
345: queue.getResendInitiator().run();
346: }
347:
348: public void testDefaultResenderClient() throws Exception {
349: doTestDefaultResender(true);
350: }
351:
352: public void testDefaultResenderServer() throws Exception {
353: doTestDefaultResender(false);
354: }
355:
356: private void doTestDefaultResender(boolean isRequestor)
357: throws Exception {
358: ObjectMessageContext context1 = setUpContext("sequence1");
359: setupContextMAPs(context1);
360: queue.replaceResender(queue.getDefaultResender());
361: ready();
362: RetransmissionQueue.ResendCandidate candidate1 = queue
363: .cacheUnacknowledged(context1);
364: RetransmissionQueue.ResendCandidate[] allCandidates = { candidate1 };
365:
366: // initial run => none due
367: runInitiator();
368:
369: // single candidate due
370: runInitiator(allCandidates);
371: setUpDefaultResender(0, isRequestor);
372: allCandidates[0].run();
373: }
374:
375: private ObjectMessageContext setUpContext(String sid) {
376: return setUpContext(sid, null);
377: }
378:
379: private ObjectMessageContext setUpContext(String sid,
380: BigInteger messageNumber) {
381: return setUpContext(sid, messageNumber, true);
382: }
383:
384: private ObjectMessageContext setUpContext(String sid,
385: BigInteger messageNumber, boolean storeSequence) {
386: ObjectMessageContext context = control
387: .createMock(ObjectMessageContext.class);
388: if (storeSequence) {
389: setUpSequenceType(context, sid, messageNumber);
390: }
391: contexts.add(context);
392:
393: return context;
394: }
395:
396: private void setupContextMAPs(ObjectMessageContext context) {
397: AddressingPropertiesImpl maps = control
398: .createMock(AddressingPropertiesImpl.class);
399: context.get(CLIENT_ADDRESSING_PROPERTIES_OUTBOUND);
400: EasyMock.expectLastCall().andReturn(maps);
401: }
402:
403: private void ready() {
404: control.replay();
405: queue.start(workQueue);
406: }
407:
408: private void setUpDefaultResender(int i, boolean isRequestor)
409: throws Exception {
410: assertTrue("too few contexts", i < contexts.size());
411: assertTrue("too few properties", i < properties.size());
412: assertTrue("too few sequences", i < sequences.size());
413: control.verify();
414: control.reset();
415:
416: contexts.get(i).get(RM_PROPERTIES_OUTBOUND);
417: EasyMock.expectLastCall().andReturn(properties.get(i)).times(2);
418: properties.get(i).getSequence();
419: EasyMock.expectLastCall().andReturn(sequences.get(i)).times(2);
420: AddressingPropertiesImpl maps = control
421: .createMock(AddressingPropertiesImpl.class);
422: contexts.get(i).get(REQUESTOR_ROLE_PROPERTY);
423: EasyMock.expectLastCall().andReturn(
424: Boolean.valueOf(isRequestor)).times(2);
425: contexts.get(i).get(SERVER_ADDRESSING_PROPERTIES_OUTBOUND);
426: EasyMock.expectLastCall().andReturn(maps);
427: sequences.get(i).getIdentifier();
428: EasyMock.expectLastCall().andReturn(identifiers.get(i));
429: Transport transport = isRequestor ? control
430: .createMock(ClientTransport.class) : control
431: .createMock(ServerTransport.class);
432: if (isRequestor) {
433: handler.getClientTransport();
434: EasyMock.expectLastCall().andReturn(transport).times(2);
435: } else {
436: handler.getServerTransport();
437: EasyMock.expectLastCall().andReturn(transport).times(1);
438: }
439: AbstractBindingBase binding = control
440: .createMock(AbstractBindingBase.class);
441: handler.getBinding();
442: EasyMock.expectLastCall().andReturn(binding);
443: HandlerInvoker handlerInvoker = control
444: .createMock(HandlerInvoker.class);
445: binding.createHandlerInvoker();
446: EasyMock.expectLastCall().andReturn(handlerInvoker);
447: AbstractBindingImpl bindingImpl = control
448: .createMock(AbstractBindingImpl.class);
449: binding.getBindingImpl();
450: EasyMock.expectLastCall().andReturn(bindingImpl).times(
451: isRequestor ? 6 : 5);
452: bindingImpl.createBindingMessageContext(contexts.get(i));
453: MessageContext bindingContext = control
454: .createMock(MessageContext.class);
455: EasyMock.expectLastCall().andReturn(bindingContext);
456: OutputStreamMessageContext outputStreamContext = control
457: .createMock(OutputStreamMessageContext.class);
458: transport.createOutputStreamContext(bindingContext);
459: EasyMock.expectLastCall().andReturn(outputStreamContext);
460:
461: if (isRequestor) {
462: setUpClientDispatch(handlerInvoker, contexts.get(i),
463: bindingContext, outputStreamContext, bindingImpl,
464: transport);
465: } else {
466: setUpServerDispatch(bindingContext, outputStreamContext);
467: }
468:
469: control.replay();
470: }
471:
472: private void setUpClientDispatch(HandlerInvoker handlerInvoker,
473: ObjectMessageContext objectContext,
474: MessageContext bindingContext,
475: OutputStreamMessageContext outputStreamContext,
476: AbstractBindingImpl bindingImpl, Transport transport)
477: throws Exception {
478: handlerInvoker.invokeProtocolHandlers(true, bindingContext);
479: EasyMock.expectLastCall().andReturn(Boolean.TRUE);
480: InputStreamMessageContext inputStreamContext = control
481: .createMock(InputStreamMessageContext.class);
482: ((ClientTransport) transport).invoke(outputStreamContext);
483: EasyMock.expectLastCall().andReturn(inputStreamContext);
484: bindingImpl.read(inputStreamContext, bindingContext);
485: EasyMock.expectLastCall();
486: bindingImpl.hasFault(bindingContext);
487: EasyMock.expectLastCall().andReturn(false);
488: bindingImpl.unmarshal(bindingContext, objectContext, null);
489: EasyMock.expectLastCall();
490: }
491:
492: private void setUpServerDispatch(MessageContext bindingContext,
493: OutputStreamMessageContext outputStreamContext) {
494: DataBindingCallback callback = control
495: .createMock(ServerDataBindingCallback.class);
496: bindingContext.get(DATABINDING_CALLBACK_PROPERTY);
497: EasyMock.expectLastCall().andReturn(callback);
498: OutputStream outputStream = control
499: .createMock(OutputStream.class);
500: outputStreamContext.getOutputStream();
501: EasyMock.expectLastCall().andReturn(outputStream);
502: }
503:
504: private void runInitiator() {
505: runInitiator(null);
506: }
507:
508: private void runInitiator(
509: RetransmissionQueue.ResendCandidate[] dueCandidates) {
510: control.verify();
511: control.reset();
512:
513: for (int i = 0; dueCandidates != null
514: && i < dueCandidates.length; i++) {
515: workQueue.execute(dueCandidates[i]);
516: EasyMock.expectLastCall();
517: }
518: /*
519: workQueue.schedule(queue.getResendInitiator(),
520: queue.getBaseRetransmissionInterval());
521: EasyMock.expectLastCall();
522: */
523: control.replay();
524: queue.getResendInitiator().run();
525: }
526:
527: private void runCandidates(
528: RetransmissionQueue.ResendCandidate[] candidates,
529: boolean[] expectAckRequested) {
530: for (int i = 0; i < candidates.length; i++) {
531: candidates[i].run();
532: assertEquals("unexpected request acknowledge",
533: expectAckRequested[i], resender.includeAckRequested);
534: assertSame("unexpected context",
535: candidates[i].getContext(), resender.context);
536: resender.clear();
537: }
538: }
539:
540: private SequenceType setUpSequenceType(
541: ObjectMessageContext context, String sid,
542: BigInteger messageNumber) {
543: RMProperties rmps = control.createMock(RMProperties.class);
544: if (context != null) {
545: context.get(RM_PROPERTIES_OUTBOUND);
546: EasyMock.expectLastCall().andReturn(rmps);
547: }
548: properties.add(rmps);
549: SequenceType sequence = control.createMock(SequenceType.class);
550: if (context != null) {
551: rmps.getSequence();
552: EasyMock.expectLastCall().andReturn(sequence);
553: }
554: if (messageNumber != null) {
555: sequence.getMessageNumber();
556: EasyMock.expectLastCall().andReturn(messageNumber);
557: } else {
558: Identifier id = control.createMock(Identifier.class);
559: sequence.getIdentifier();
560: EasyMock.expectLastCall().andReturn(id);
561: id.getValue();
562: EasyMock.expectLastCall().andReturn(sid);
563: identifiers.add(id);
564: }
565: sequences.add(sequence);
566: return sequence;
567: }
568:
569: @SuppressWarnings("unchecked")
570: private SourceSequence setUpSequence(String sid,
571: BigInteger[] messageNumbers, boolean[] isAcked) {
572: SourceSequence sequence = control
573: .createMock(SourceSequence.class);
574: Identifier id = control.createMock(Identifier.class);
575: sequence.getIdentifier();
576: EasyMock.expectLastCall().andReturn(id);
577: id.getValue();
578: EasyMock.expectLastCall().andReturn(sid);
579: identifiers.add(id);
580: boolean includesAcked = false;
581: for (int i = 0; isAcked != null && i < isAcked.length; i++) {
582: sequence.isAcknowledged(messageNumbers[i]);
583: EasyMock.expectLastCall().andReturn(isAcked[i]);
584: if (isAcked[i]) {
585: includesAcked = true;
586: }
587: }
588: if (includesAcked) {
589: sequence.getIdentifier();
590: EasyMock.expectLastCall().andReturn(id);
591: RMStore store = control.createMock(RMStore.class);
592: handler.getStore();
593: EasyMock.expectLastCall().andReturn(store);
594: }
595: return sequence;
596: }
597:
598: private static class TestResender implements
599: RetransmissionQueue.Resender {
600: ObjectMessageContext context;
601: boolean includeAckRequested;
602:
603: public void resend(ObjectMessageContext ctx,
604: boolean requestAcknowledge) {
605: context = ctx;
606: includeAckRequested = requestAcknowledge;
607: }
608:
609: void clear() {
610: context = null;
611: includeAckRequested = false;
612: }
613: };
614: }
|