001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm;
019:
020: import java.math.BigInteger;
021: import java.util.ArrayList;
022: import java.util.Collection;
023: import java.util.List;
024: import java.util.Timer;
025:
026: import org.apache.cxf.helpers.CastUtils;
027: import org.apache.cxf.message.Exchange;
028: import org.apache.cxf.message.Message;
029: import org.apache.cxf.ws.addressing.v200408.AttributedURI;
030: import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
031: import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
032: import org.apache.cxf.ws.rm.manager.AcksPolicyType;
033: import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
034: import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
035: import org.apache.cxf.ws.rm.persistence.RMStore;
036: import org.apache.cxf.ws.rm.policy.RMAssertion;
037: import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
038: import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
039: import org.apache.cxf.ws.rm.policy.RMAssertion.InactivityTimeout;
040: import org.easymock.classextension.EasyMock;
041: import org.easymock.classextension.IMocksControl;
042: import org.junit.After;
043: import org.junit.Assert;
044: import org.junit.Before;
045: import org.junit.Test;
046:
047: public class DestinationSequenceTest extends Assert {
048:
049: private IMocksControl control;
050: private ObjectFactory factory;
051: private Identifier id;
052: private EndpointReferenceType ref;
053: private Destination destination;
054: private RMManager manager;
055: private RMAssertion rma;
056: private AcksPolicyType ap;
057: private DestinationPolicyType dp;
058:
059: @Before
060: public void setUp() {
061: control = EasyMock.createNiceControl();
062: factory = new ObjectFactory();
063: ref = control.createMock(EndpointReferenceType.class);
064: id = factory.createIdentifier();
065: id.setValue("seq");
066: }
067:
068: @After
069: public void tearDown() {
070: ref = null;
071: destination = null;
072: manager = null;
073: rma = null;
074: dp = null;
075: ap = null;
076:
077: }
078:
079: @Test
080: public void testConstructors() {
081:
082: Identifier otherId = factory.createIdentifier();
083: otherId.setValue("otherSeq");
084:
085: DestinationSequence seq = new DestinationSequence(id, ref,
086: destination);
087: assertEquals(id, seq.getIdentifier());
088: assertNull(seq.getLastMessageNumber());
089: assertSame(ref, seq.getAcksTo());
090: assertNotNull(seq.getAcknowledgment());
091: assertNotNull(seq.getMonitor());
092:
093: SequenceAcknowledgement ack = RMUtils.getWSRMFactory()
094: .createSequenceAcknowledgement();
095: seq = new DestinationSequence(id, ref, BigInteger.TEN, ack);
096: assertEquals(id, seq.getIdentifier());
097: assertEquals(BigInteger.TEN, seq.getLastMessageNumber());
098: assertSame(ref, seq.getAcksTo());
099: assertSame(ack, seq.getAcknowledgment());
100: assertNotNull(seq.getMonitor());
101:
102: }
103:
104: @Test
105: public void testEqualsAndHashCode() {
106:
107: DestinationSequence seq = new DestinationSequence(id, ref,
108: destination);
109: DestinationSequence otherSeq = null;
110: assertTrue(!seq.equals(otherSeq));
111: otherSeq = new DestinationSequence(id, ref, destination);
112: assertEquals(seq, otherSeq);
113: assertEquals(seq.hashCode(), otherSeq.hashCode());
114: Identifier otherId = factory.createIdentifier();
115: otherId.setValue("otherSeq");
116: otherSeq = new DestinationSequence(otherId, ref, destination);
117: assertTrue(!seq.equals(otherSeq));
118: assertTrue(seq.hashCode() != otherSeq.hashCode());
119: assertTrue(!seq.equals(this ));
120: }
121:
122: @Test
123: public void testGetSetDestination() {
124: control.replay();
125: DestinationSequence seq = new DestinationSequence(id, ref,
126: destination);
127: seq.setDestination(destination);
128: assertSame(destination, seq.getDestination());
129: }
130:
131: @Test
132: public void testGetEndpointIdentifier() {
133: setUpDestination();
134: String name = "abc";
135: EasyMock.expect(destination.getName()).andReturn(name);
136: control.replay();
137:
138: DestinationSequence seq = new DestinationSequence(id, ref,
139: destination);
140: assertEquals("Unexpected endpoint identifier", name, seq
141: .getEndpointIdentifier());
142: control.verify();
143: }
144:
145: @Test
146: public void testAcknowledgeBasic() throws SequenceFault {
147: setUpDestination();
148: Message message1 = setUpMessage("1");
149: Message message2 = setUpMessage("2");
150: control.replay();
151:
152: DestinationSequence seq = new DestinationSequence(id, ref,
153: destination);
154: List<AcknowledgementRange> ranges = seq.getAcknowledgment()
155: .getAcknowledgementRange();
156: assertEquals(0, ranges.size());
157:
158: seq.acknowledge(message1);
159: assertEquals(1, ranges.size());
160: AcknowledgementRange r1 = ranges.get(0);
161: assertEquals(1, r1.getLower().intValue());
162: assertEquals(1, r1.getUpper().intValue());
163:
164: seq.acknowledge(message2);
165: assertEquals(1, ranges.size());
166: r1 = ranges.get(0);
167: assertEquals(1, r1.getLower().intValue());
168: assertEquals(2, r1.getUpper().intValue());
169:
170: control.verify();
171: }
172:
173: @Test
174: public void testAcknowledgeLastMessageNumberExceeded()
175: throws SequenceFault {
176: setUpDestination();
177: Message message1 = setUpMessage("1");
178: Message message2 = setUpMessage("2");
179: control.replay();
180:
181: DestinationSequence seq = new DestinationSequence(id, ref,
182: destination);
183:
184: seq.acknowledge(message1);
185: seq.setLastMessageNumber(BigInteger.ONE);
186: try {
187: seq.acknowledge(message2);
188: fail("Expected SequenceFault not thrown.");
189: } catch (SequenceFault sf) {
190: assertEquals("LastMessageNumberExceeded", sf
191: .getSequenceFault().getFaultCode().getLocalPart());
192: }
193:
194: control.verify();
195: }
196:
197: @Test
198: public void testAcknowledgeAppendRange() throws SequenceFault {
199: setUpDestination();
200: Message[] messages = new Message[] { setUpMessage("1"),
201: setUpMessage("2"), setUpMessage("5"),
202: setUpMessage("4"), setUpMessage("6") };
203:
204: control.replay();
205:
206: DestinationSequence seq = new DestinationSequence(id, ref,
207: destination);
208: List<AcknowledgementRange> ranges = seq.getAcknowledgment()
209: .getAcknowledgementRange();
210: for (int i = 0; i < messages.length; i++) {
211: seq.acknowledge(messages[i]);
212: }
213: assertEquals(2, ranges.size());
214: AcknowledgementRange r = ranges.get(0);
215: assertEquals(1, r.getLower().intValue());
216: assertEquals(2, r.getUpper().intValue());
217: r = ranges.get(1);
218: assertEquals(4, r.getLower().intValue());
219: assertEquals(6, r.getUpper().intValue());
220:
221: control.verify();
222: }
223:
224: @Test
225: public void testAcknowledgeInsertRange() throws SequenceFault {
226: setUpDestination();
227: Message[] messages = new Message[] { setUpMessage("1"),
228: setUpMessage("2"), setUpMessage("9"),
229: setUpMessage("10"), setUpMessage("4"),
230: setUpMessage("9"), setUpMessage("2") };
231: control.replay();
232:
233: DestinationSequence seq = new DestinationSequence(id, ref,
234: destination);
235: List<AcknowledgementRange> ranges = seq.getAcknowledgment()
236: .getAcknowledgementRange();
237: for (int i = 0; i < messages.length; i++) {
238: seq.acknowledge(messages[i]);
239: }
240:
241: assertEquals(3, ranges.size());
242: AcknowledgementRange r = ranges.get(0);
243: assertEquals(1, r.getLower().intValue());
244: assertEquals(2, r.getUpper().intValue());
245: r = ranges.get(1);
246: assertEquals(4, r.getLower().intValue());
247: assertEquals(4, r.getUpper().intValue());
248: r = ranges.get(2);
249: assertEquals(9, r.getLower().intValue());
250: assertEquals(10, r.getUpper().intValue());
251:
252: control.verify();
253: }
254:
255: @Test
256: public void testAcknowledgePrependRange() throws SequenceFault {
257: setUpDestination();
258: Message[] messages = new Message[] { setUpMessage("4"),
259: setUpMessage("5"), setUpMessage("6"),
260: setUpMessage("4"), setUpMessage("2"), setUpMessage("2") };
261: control.replay();
262:
263: DestinationSequence seq = new DestinationSequence(id, ref,
264: destination);
265: List<AcknowledgementRange> ranges = seq.getAcknowledgment()
266: .getAcknowledgementRange();
267: for (int i = 0; i < messages.length; i++) {
268: seq.acknowledge(messages[i]);
269: }
270: assertEquals(2, ranges.size());
271: AcknowledgementRange r = ranges.get(0);
272: assertEquals(2, r.getLower().intValue());
273: assertEquals(2, r.getUpper().intValue());
274: r = ranges.get(1);
275: assertEquals(4, r.getLower().intValue());
276: assertEquals(6, r.getUpper().intValue());
277:
278: control.verify();
279: }
280:
281: @Test
282: public void testMerge() {
283: DestinationSequence seq = new DestinationSequence(id, ref,
284: destination);
285: List<AcknowledgementRange> ranges = seq.getAcknowledgment()
286: .getAcknowledgementRange();
287: AcknowledgementRange r;
288: for (int i = 0; i < 5; i++) {
289: r = new AcknowledgementRange();
290: r.setLower(new BigInteger(Integer.toString(3 * i + 1)));
291: r.setUpper(new BigInteger(Integer.toString(3 * i + 3)));
292: ranges.add(r);
293: }
294: seq.mergeRanges();
295: assertEquals(1, ranges.size());
296: r = ranges.get(0);
297: assertEquals(BigInteger.ONE, r.getLower());
298: assertEquals(new BigInteger("15"), r.getUpper());
299: ranges.clear();
300: for (int i = 0; i < 5; i++) {
301: r = new AcknowledgementRange();
302: r.setLower(new BigInteger(Integer.toString(3 * i + 1)));
303: r.setUpper(new BigInteger(Integer.toString(3 * i + 2)));
304: ranges.add(r);
305: }
306: seq.mergeRanges();
307: assertEquals(5, ranges.size());
308: ranges.clear();
309: for (int i = 0; i < 5; i++) {
310: if (i != 2) {
311: r = new AcknowledgementRange();
312: r.setLower(new BigInteger(Integer.toString(3 * i + 1)));
313: r.setUpper(new BigInteger(Integer.toString(3 * i + 3)));
314: ranges.add(r);
315: }
316: }
317: seq.mergeRanges();
318: assertEquals(2, ranges.size());
319: r = ranges.get(0);
320: assertEquals(BigInteger.ONE, r.getLower());
321: assertEquals(new BigInteger("6"), r.getUpper());
322: r = ranges.get(1);
323: assertEquals(BigInteger.TEN, r.getLower());
324: assertEquals(new BigInteger("15"), r.getUpper());
325: }
326:
327: @Test
328: public void testMonitor() throws SequenceFault {
329: setUpDestination();
330: Message[] messages = new Message[15];
331: for (int i = 0; i < messages.length; i++) {
332: messages[i] = setUpMessage(Integer.toString(i + 1));
333: }
334: control.replay();
335:
336: DestinationSequence seq = new DestinationSequence(id, ref,
337: destination);
338: SequenceMonitor monitor = seq.getMonitor();
339: assertNotNull(monitor);
340: monitor.setMonitorInterval(500);
341:
342: assertEquals(0, monitor.getMPM());
343:
344: for (int i = 0; i < 10; i++) {
345: seq.acknowledge(messages[i]);
346: try {
347: Thread.sleep(50);
348: } catch (InterruptedException ex) {
349: // ignore
350: }
351: }
352: int mpm1 = monitor.getMPM();
353: assertTrue(mpm1 > 0);
354:
355: for (int i = 10; i < messages.length; i++) {
356: seq.acknowledge(messages[i]);
357: try {
358: Thread.sleep(100);
359: } catch (InterruptedException ex) {
360: // ignore
361: }
362: }
363: int mpm2 = monitor.getMPM();
364: assertTrue(mpm2 > 0);
365: assertTrue(mpm1 > mpm2);
366:
367: control.verify();
368: }
369:
370: @Test
371: public void testAcknowledgeImmediate() throws SequenceFault {
372: setUpDestination();
373: Message message = setUpMessage("1");
374: control.replay();
375:
376: DestinationSequence seq = new DestinationSequence(id, ref,
377: destination);
378: assertTrue(!seq.sendAcknowledgement());
379:
380: seq.acknowledge(message);
381:
382: assertTrue(seq.sendAcknowledgement());
383: seq.acknowledgmentSent();
384: assertFalse(seq.sendAcknowledgement());
385:
386: control.verify();
387: }
388:
389: @Test
390: public void testAcknowledgeDeferred() throws SequenceFault,
391: RMException {
392: Timer timer = new Timer();
393: setUpDestination(timer);
394:
395: DestinationSequence seq = new DestinationSequence(id, ref,
396: destination);
397: RMEndpoint rme = control.createMock(RMEndpoint.class);
398: EasyMock.expect(destination.getReliableEndpoint()).andReturn(
399: rme).anyTimes();
400: Proxy proxy = control.createMock(Proxy.class);
401: EasyMock.expect(rme.getProxy()).andReturn(proxy).anyTimes();
402: proxy.acknowledge(seq);
403: EasyMock.expectLastCall();
404:
405: Message[] messages = new Message[] { setUpMessage("1"),
406: setUpMessage("2"), setUpMessage("3") };
407: control.replay();
408:
409: ap.setIntraMessageThreshold(0);
410: AcknowledgementInterval ai = new org.apache.cxf.ws.rm.policy.ObjectFactory()
411: .createRMAssertionAcknowledgementInterval();
412: ai.setMilliseconds(new BigInteger("200"));
413: rma.setAcknowledgementInterval(ai);
414:
415: assertTrue(!seq.sendAcknowledgement());
416:
417: for (int i = 0; i < messages.length; i++) {
418: seq.acknowledge(messages[i]);
419: }
420:
421: assertFalse(seq.sendAcknowledgement());
422:
423: try {
424: Thread.sleep(250);
425: } catch (InterruptedException ex) {
426: // ignore
427: }
428: assertTrue(seq.sendAcknowledgement());
429: seq.acknowledgmentSent();
430: assertFalse(seq.sendAcknowledgement());
431:
432: control.verify();
433: }
434:
435: @Test
436: public void testCorrelationID() {
437: setUpDestination();
438: DestinationSequence seq = new DestinationSequence(id, ref,
439: destination);
440: String correlationID = "abdc1234";
441: assertNull("unexpected correlation ID", seq.getCorrelationID());
442: seq.setCorrelationID(correlationID);
443: assertEquals("unexpected correlation ID", correlationID, seq
444: .getCorrelationID());
445: }
446:
447: @Test
448: public void testApplyDeliveryAssuranceAtMostOnce()
449: throws RMException {
450: setUpDestination();
451:
452: BigInteger mn = BigInteger.TEN;
453: SequenceAcknowledgement ack = control
454: .createMock(SequenceAcknowledgement.class);
455: List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
456: AcknowledgementRange r = control
457: .createMock(AcknowledgementRange.class);
458: EasyMock.expect(ack.getAcknowledgementRange())
459: .andReturn(ranges);
460: DeliveryAssuranceType da = control
461: .createMock(DeliveryAssuranceType.class);
462: EasyMock.expect(manager.getDeliveryAssurance()).andReturn(da);
463: EasyMock.expect(da.isSetAtMostOnce()).andReturn(true);
464:
465: control.replay();
466: DestinationSequence ds = new DestinationSequence(id, ref, null,
467: ack);
468: ds.setDestination(destination);
469: ds.applyDeliveryAssurance(mn);
470: control.verify();
471:
472: control.reset();
473: ranges.add(r);
474: EasyMock.expect(destination.getManager()).andReturn(manager);
475: EasyMock.expect(manager.getDeliveryAssurance()).andReturn(da);
476: EasyMock.expect(da.isSetAtMostOnce()).andReturn(true);
477: EasyMock.expect(ack.getAcknowledgementRange())
478: .andReturn(ranges);
479: EasyMock.expect(r.getLower()).andReturn(new BigInteger("5"));
480: EasyMock.expect(r.getUpper()).andReturn(new BigInteger("15"));
481: control.replay();
482: try {
483: ds.applyDeliveryAssurance(mn);
484: fail("Expected Fault not thrown.");
485: } catch (RMException ex) {
486: assertEquals("MESSAGE_ALREADY_DELIVERED_EXC", ex.getCode());
487: }
488:
489: control.verify();
490:
491: }
492:
493: @Test
494: public void testInOrderNoWait() throws RMException {
495: setUpDestination();
496:
497: BigInteger mn = BigInteger.TEN;
498:
499: DeliveryAssuranceType da = control
500: .createMock(DeliveryAssuranceType.class);
501: EasyMock.expect(manager.getDeliveryAssurance()).andReturn(da);
502: EasyMock.expect(da.isSetAtMostOnce()).andReturn(false);
503: EasyMock.expect(da.isSetAtLeastOnce()).andReturn(true);
504: EasyMock.expect(da.isSetInOrder()).andReturn(true);
505:
506: SequenceAcknowledgement ack = control
507: .createMock(SequenceAcknowledgement.class);
508: List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
509: AcknowledgementRange r = control
510: .createMock(AcknowledgementRange.class);
511: ranges.add(r);
512: EasyMock.expect(ack.getAcknowledgementRange())
513: .andReturn(ranges).times(3);
514: EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
515: EasyMock.expect(r.getUpper()).andReturn(new BigInteger("15"));
516:
517: control.replay();
518:
519: DestinationSequence ds = new DestinationSequence(id, ref, null,
520: ack);
521: ds.setDestination(destination);
522: ds.applyDeliveryAssurance(mn);
523: control.verify();
524: }
525:
526: @Test
527: public void testInOrderWait() {
528: setUpDestination();
529: Message[] messages = new Message[5];
530: for (int i = 0; i < messages.length; i++) {
531: messages[i] = setUpMessage(Integer.toString(i + 1));
532: }
533:
534: DeliveryAssuranceType da = control
535: .createMock(DeliveryAssuranceType.class);
536: EasyMock.expect(manager.getDeliveryAssurance()).andReturn(da)
537: .anyTimes();
538: EasyMock.expect(da.isSetAtMostOnce()).andReturn(false)
539: .anyTimes();
540: EasyMock.expect(da.isSetAtLeastOnce()).andReturn(true)
541: .anyTimes();
542: EasyMock.expect(da.isSetInOrder()).andReturn(true).anyTimes();
543:
544: SequenceAcknowledgement ack = factory
545: .createSequenceAcknowledgement();
546: List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
547:
548: final AcknowledgementRange r = factory
549: .createSequenceAcknowledgementAcknowledgementRange();
550: r.setUpper(new BigInteger(Integer.toString(messages.length)));
551: ranges.add(r);
552: final DestinationSequence ds = new DestinationSequence(id, ref,
553: null, ack);
554: ds.setDestination(destination);
555:
556: class Acknowledger extends Thread {
557: Message message;
558: BigInteger messageNr;
559:
560: Acknowledger(Message m, BigInteger mn) {
561: message = m;
562: messageNr = mn;
563: }
564:
565: public void run() {
566: try {
567: ds.acknowledge(message);
568: ds.applyDeliveryAssurance(messageNr);
569: } catch (Exception ex) {
570: // ignore
571: }
572: }
573: }
574:
575: control.replay();
576:
577: Thread[] threads = new Thread[messages.length];
578: for (int i = messages.length - 1; i >= 0; i--) {
579: threads[i] = new Acknowledger(messages[i], new BigInteger(
580: Integer.toString(i + 1)));
581: threads[i].start();
582: try {
583: Thread.sleep(100);
584: } catch (InterruptedException ex) {
585: // ignore
586: }
587: }
588:
589: boolean timedOut = false;
590: for (int i = 0; i < messages.length; i++) {
591: try {
592: threads[i].join(1000);
593: } catch (InterruptedException ex) {
594: timedOut = true;
595: }
596: }
597: assertTrue(
598: "timed out waiting for messages to be processed in order",
599: !timedOut);
600:
601: control.verify();
602: }
603:
604: @Test
605: public void testAllPredecessorsAcknowledged() {
606:
607: SequenceAcknowledgement ack = control
608: .createMock(SequenceAcknowledgement.class);
609: List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
610: AcknowledgementRange r = control
611: .createMock(AcknowledgementRange.class);
612: EasyMock.expect(ack.getAcknowledgementRange())
613: .andReturn(ranges);
614: control.replay();
615: DestinationSequence ds = new DestinationSequence(id, ref, null,
616: ack);
617: ds.setDestination(destination);
618: assertTrue("all predecessors acknowledged", !ds
619: .allPredecessorsAcknowledged(BigInteger.TEN));
620: control.verify();
621:
622: control.reset();
623: ranges.add(r);
624: EasyMock.expect(ack.getAcknowledgementRange())
625: .andReturn(ranges).times(2);
626: EasyMock.expect(r.getLower()).andReturn(BigInteger.TEN);
627: control.replay();
628: assertTrue("all predecessors acknowledged", !ds
629: .allPredecessorsAcknowledged(BigInteger.TEN));
630: control.verify();
631:
632: control.reset();
633: EasyMock.expect(ack.getAcknowledgementRange())
634: .andReturn(ranges).times(3);
635: EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
636: EasyMock.expect(r.getUpper()).andReturn(new BigInteger("5"));
637: control.replay();
638: assertTrue("all predecessors acknowledged", !ds
639: .allPredecessorsAcknowledged(BigInteger.TEN));
640: control.verify();
641:
642: control.reset();
643: EasyMock.expect(ack.getAcknowledgementRange())
644: .andReturn(ranges).times(3);
645: EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
646: EasyMock.expect(r.getUpper()).andReturn(BigInteger.TEN);
647: control.replay();
648: assertTrue("not all predecessors acknowledged", ds
649: .allPredecessorsAcknowledged(BigInteger.TEN));
650: control.verify();
651:
652: ranges.add(r);
653: control.reset();
654: EasyMock.expect(ack.getAcknowledgementRange())
655: .andReturn(ranges);
656: control.replay();
657: assertTrue("all predecessors acknowledged", !ds
658: .allPredecessorsAcknowledged(BigInteger.TEN));
659: control.verify();
660: }
661:
662: @Test
663: public void testScheduleSequenceTermination() throws SequenceFault {
664: Timer timer = new Timer();
665: setUpDestination(timer);
666:
667: DestinationSequence seq = new DestinationSequence(id, ref,
668: destination);
669: destination.removeSequence(seq);
670: EasyMock.expectLastCall();
671:
672: Message message = setUpMessage("1");
673:
674: RMEndpoint rme = control.createMock(RMEndpoint.class);
675: EasyMock.expect(destination.getReliableEndpoint()).andReturn(
676: rme);
677: long arrival = System.currentTimeMillis();
678: EasyMock.expect(rme.getLastApplicationMessage()).andReturn(
679: arrival);
680:
681: control.replay();
682: InactivityTimeout iat = new RMAssertion.InactivityTimeout();
683: iat.setMilliseconds(new BigInteger("200"));
684: rma.setInactivityTimeout(iat);
685:
686: seq.acknowledge(message);
687:
688: try {
689: Thread.sleep(250);
690: } catch (InterruptedException ex) {
691: // ignore
692: }
693:
694: control.verify();
695: }
696:
697: @Test
698: public void testSequenceTermination() {
699: destination = control.createMock(Destination.class);
700: DestinationSequence seq = new DestinationSequence(id, ref,
701: destination);
702: RMEndpoint rme = control.createMock(RMEndpoint.class);
703: EasyMock.expect(destination.getReliableEndpoint()).andReturn(
704: rme);
705: DestinationSequence.SequenceTermination st = seq.new SequenceTermination();
706: st.updateInactivityTimeout(30000L);
707: long lastAppMessage = System.currentTimeMillis() - 30000L;
708: EasyMock.expect(rme.getLastControlMessage()).andReturn(0L);
709: EasyMock.expect(rme.getLastApplicationMessage()).andReturn(
710: lastAppMessage);
711: destination.removeSequence(seq);
712: EasyMock.expectLastCall();
713: control.replay();
714: st.run();
715: control.verify();
716: }
717:
718: @Test
719: public void testSequenceTerminationNotNecessary() {
720: destination = control.createMock(Destination.class);
721: manager = control.createMock(RMManager.class);
722: EasyMock.expect(destination.getManager()).andReturn(manager);
723: Timer t = new Timer();
724: EasyMock.expect(manager.getTimer()).andReturn(t);
725: DestinationSequence seq = new DestinationSequence(id, ref,
726: destination);
727: RMEndpoint rme = control.createMock(RMEndpoint.class);
728: EasyMock.expect(destination.getReliableEndpoint()).andReturn(
729: rme);
730: DestinationSequence.SequenceTermination st = seq.new SequenceTermination();
731: st.updateInactivityTimeout(30000L);
732: long lastAppMessage = System.currentTimeMillis() - 1000L;
733: EasyMock.expect(rme.getLastControlMessage()).andReturn(0L);
734: EasyMock.expect(rme.getLastApplicationMessage()).andReturn(
735: lastAppMessage);
736: EasyMock.expectLastCall();
737: control.replay();
738: st.run();
739: control.verify();
740: }
741:
742: @Test
743: public void testCanPiggybackAckOnPartialResponse() {
744: DestinationSequence seq = new DestinationSequence(id, ref,
745: destination);
746: AttributedURI uri = control.createMock(AttributedURI.class);
747: EasyMock.expect(ref.getAddress()).andReturn(uri);
748: String addr = "http://localhost:9999/reponses";
749: EasyMock.expect(uri.getValue()).andReturn(addr);
750: control.replay();
751: assertTrue(!seq.canPiggybackAckOnPartialResponse());
752: control.verify();
753: control.reset();
754: EasyMock.expect(ref.getAddress()).andReturn(uri);
755: EasyMock.expect(uri.getValue()).andReturn(
756: RMConstants.getAnonymousAddress());
757: control.replay();
758: assertTrue(seq.canPiggybackAckOnPartialResponse());
759: control.verify();
760: }
761:
762: @Test
763: public void testPurgeAcknowledged() {
764: destination = control.createMock(Destination.class);
765: DestinationSequence seq = new DestinationSequence(id, ref,
766: destination);
767: manager = control.createMock(RMManager.class);
768: EasyMock.expect(destination.getManager()).andReturn(manager);
769: RMStore store = control.createMock(RMStore.class);
770: EasyMock.expect(manager.getStore()).andReturn(store);
771: store.removeMessages(EasyMock.eq(id), CastUtils.cast(EasyMock
772: .isA(Collection.class), BigInteger.class), EasyMock
773: .eq(false));
774: EasyMock.expectLastCall();
775: control.replay();
776: seq.purgeAcknowledged(BigInteger.ONE);
777: control.verify();
778: }
779:
780: @Test
781: public void testCancelDeferredAcknowledgements() {
782: destination = control.createMock(Destination.class);
783: manager = control.createMock(RMManager.class);
784: EasyMock.expect(destination.getManager()).andReturn(manager);
785: Timer t = new Timer();
786: EasyMock.expect(manager.getTimer()).andReturn(t);
787: DestinationSequence seq = new DestinationSequence(id, ref,
788: destination);
789: control.replay();
790: seq.scheduleDeferredAcknowledgement(30000L);
791: seq.cancelDeferredAcknowledgments();
792: seq.cancelDeferredAcknowledgments();
793: t.cancel();
794: control.verify();
795: }
796:
797: @Test
798: public void testCancelTermination() {
799: destination = control.createMock(Destination.class);
800: manager = control.createMock(RMManager.class);
801: EasyMock.expect(destination.getManager()).andReturn(manager);
802: Timer t = new Timer();
803: EasyMock.expect(manager.getTimer()).andReturn(t);
804: DestinationSequence seq = new DestinationSequence(id, ref,
805: destination);
806: control.replay();
807: seq.scheduleSequenceTermination(30000L);
808: seq.cancelTermination();
809: t.cancel();
810: control.verify();
811: }
812:
813: private void setUpDestination() {
814: setUpDestination(null);
815: }
816:
817: private void setUpDestination(Timer timer) {
818:
819: manager = control.createMock(RMManager.class);
820:
821: org.apache.cxf.ws.rm.manager.ObjectFactory cfgFactory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
822: dp = cfgFactory.createDestinationPolicyType();
823: ap = cfgFactory.createAcksPolicyType();
824: dp.setAcksPolicy(ap);
825:
826: org.apache.cxf.ws.rm.policy.ObjectFactory policyFactory = new org.apache.cxf.ws.rm.policy.ObjectFactory();
827: rma = policyFactory.createRMAssertion();
828: BaseRetransmissionInterval bri = policyFactory
829: .createRMAssertionBaseRetransmissionInterval();
830: bri.setMilliseconds(new BigInteger("3000"));
831: rma.setBaseRetransmissionInterval(bri);
832:
833: EasyMock.expect(manager.getRMAssertion()).andReturn(rma)
834: .anyTimes();
835: EasyMock.expect(manager.getDestinationPolicy()).andReturn(dp)
836: .anyTimes();
837: EasyMock.expect(manager.getStore()).andReturn(null).anyTimes();
838:
839: destination = control.createMock(Destination.class);
840: EasyMock.expect(destination.getManager()).andReturn(manager)
841: .anyTimes();
842:
843: if (null != timer) {
844: EasyMock.expect(manager.getTimer()).andReturn(timer)
845: .anyTimes();
846: }
847:
848: }
849:
850: private Message setUpMessage(String messageNr) {
851: Message message = control.createMock(Message.class);
852: Exchange exchange = control.createMock(Exchange.class);
853: EasyMock.expect(message.getExchange()).andReturn(exchange);
854: EasyMock.expect(exchange.getOutMessage()).andReturn(null);
855: EasyMock.expect(exchange.getOutFaultMessage()).andReturn(null);
856: RMProperties rmps = control.createMock(RMProperties.class);
857: EasyMock.expect(
858: message.get(RMMessageConstants.RM_PROPERTIES_INBOUND))
859: .andReturn(rmps);
860: SequenceType st = control.createMock(SequenceType.class);
861: EasyMock.expect(rmps.getSequence()).andReturn(st);
862: BigInteger val = new BigInteger(messageNr);
863: EasyMock.expect(st.getMessageNumber()).andReturn(val);
864: return message;
865: }
866:
867: }
|