Source Code Cross Referenced for RetransmissionQueue.java in  » ESB » celtix-1.0 » org » objectweb » celtix » bus » ws » rm » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » celtix 1.0 » org.objectweb.celtix.bus.ws.rm 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.objectweb.celtix.bus.ws.rm;
002:
003:        import java.io.IOException;
004:        import java.math.BigInteger;
005:        import java.util.ArrayList;
006:        import java.util.Collection;
007:        import java.util.HashMap;
008:        import java.util.Iterator;
009:        import java.util.List;
010:        import java.util.Map;
011:        import java.util.Timer;
012:        import java.util.TimerTask;
013:        import java.util.logging.Level;
014:        import java.util.logging.Logger;
015:
016:        import javax.xml.namespace.QName;
017:        import javax.xml.soap.SOAPMessage;
018:        import javax.xml.ws.handler.Handler;
019:        import javax.xml.ws.handler.MessageContext;
020:
021:        import org.objectweb.celtix.bindings.AbstractBindingBase;
022:        import org.objectweb.celtix.bindings.AbstractBindingImpl;
023:        import org.objectweb.celtix.bindings.Request;
024:        import org.objectweb.celtix.bindings.Response;
025:        import org.objectweb.celtix.bindings.ServerRequest;
026:        import org.objectweb.celtix.bus.ws.addressing.ContextUtils;
027:        import org.objectweb.celtix.bus.ws.addressing.soap.MAPCodec;
028:        import org.objectweb.celtix.bus.ws.rm.soap.RMSoapHandler;
029:        import org.objectweb.celtix.common.logging.LogUtils;
030:        import org.objectweb.celtix.context.InputStreamMessageContext;
031:        import org.objectweb.celtix.context.ObjectMessageContext;
032:        import org.objectweb.celtix.context.ObjectMessageContextImpl;
033:        import org.objectweb.celtix.context.OutputStreamMessageContext;
034:        import org.objectweb.celtix.transports.ClientTransport;
035:        import org.objectweb.celtix.transports.ServerTransport;
036:        import org.objectweb.celtix.transports.Transport;
037:        import org.objectweb.celtix.workqueue.WorkQueue;
038:        import org.objectweb.celtix.ws.addressing.AddressingProperties;
039:        import org.objectweb.celtix.ws.rm.AckRequestedType;
040:        import org.objectweb.celtix.ws.rm.Identifier;
041:        import org.objectweb.celtix.ws.rm.RMProperties;
042:        import org.objectweb.celtix.ws.rm.SequenceType;
043:        import org.objectweb.celtix.ws.rm.persistence.RMMessage;
044:        import org.objectweb.celtix.ws.rm.persistence.RMStore;
045:        import org.objectweb.celtix.ws.rm.policy.RMAssertionType;
046:
047:        public class RetransmissionQueue {
048:            public static final QName EXPONENTIAL_BACKOFF_BASE_ATTR = new QName(
049:                    RMHandler.RM_CONFIGURATION_URI, "exponentialBackoffBase");
050:            public static final String DEFAULT_BASE_RETRANSMISSION_INTERVAL = "3000";
051:            public static final String DEFAULT_EXPONENTIAL_BACKOFF = "2";
052:            private static final String SOAP_MSG_KEY = "org.objectweb.celtix.bindings.soap.message";
053:            private static final Logger LOG = LogUtils
054:                    .getL7dLogger(RetransmissionQueue.class);
055:
056:            private RMHandler handler;
057:            private RMSoapHandler rmSOAPHandler;
058:            private MAPCodec wsaSOAPHandler;
059:            private WorkQueue workQueue;
060:            private long baseRetransmissionInterval;
061:            private int exponentialBackoff;
062:            private Map<String, List<ResendCandidate>> candidates;
063:            private Runnable resendInitiator;
064:            private boolean shutdown;
065:            private Resender resender;
066:            private Timer timer;
067:
068:            /**
069:             * Constructor.
070:             */
071:            public RetransmissionQueue(RMHandler h) {
072:                this (h, Long.parseLong(DEFAULT_BASE_RETRANSMISSION_INTERVAL),
073:                        Integer.parseInt(DEFAULT_EXPONENTIAL_BACKOFF));
074:            }
075:
076:            /**
077:             * Constructor.
078:             */
079:            public RetransmissionQueue(RMHandler h, RMAssertionType rma) {
080:                this (h, rma.getBaseRetransmissionInterval().getMilliseconds()
081:                        .longValue(), Integer.parseInt(rma
082:                        .getExponentialBackoff().getOtherAttributes().get(
083:                                EXPONENTIAL_BACKOFF_BASE_ATTR)));
084:            }
085:
086:            /**
087:             * Constructor.
088:             * 
089:             * @param base the base retransmission interval
090:             * @param backoff the exponential backoff
091:             */
092:            public RetransmissionQueue(RMHandler h, long base, int backoff) {
093:                handler = h;
094:                baseRetransmissionInterval = base;
095:                exponentialBackoff = backoff;
096:                candidates = new HashMap<String, List<ResendCandidate>>();
097:                resender = getDefaultResender();
098:            }
099:
100:            /**
101:             * Create default Resender logic.
102:             * 
103:             * @return default Resender
104:             */
105:            protected final Resender getDefaultResender() {
106:                return new Resender() {
107:                    public void resend(ObjectMessageContext context,
108:                            boolean requestAcknowledge) {
109:                        RMProperties properties = RMContextUtils
110:                                .retrieveRMProperties(context, true);
111:                        SequenceType st = properties.getSequence();
112:                        if (st != null) {
113:                            LOG.log(Level.INFO, "RESEND_MSG", st
114:                                    .getMessageNumber());
115:                        }
116:                        try {
117:                            refreshMAPs(context);
118:                            refreshRMProperties(context, requestAcknowledge);
119:                            if (ContextUtils.isRequestor(context)) {
120:                                clientResend(context);
121:                            } else {
122:                                serverResend(context);
123:                            }
124:                        } catch (Exception e) {
125:                            LOG.log(Level.WARNING, "RESEND_FAILED_MSG", e);
126:                        }
127:                    }
128:                };
129:            };
130:
131:            /**
132:             * Refresh the MAPs with a new message ID (to avoid the resend being
133:             * rejected by the receiver-side WS-Addressing layer as a duplicate).
134:             * 
135:             * @param context the message context
136:             */
137:            private void refreshMAPs(MessageContext context) {
138:                AddressingProperties maps = ContextUtils.retrieveMAPs(context,
139:                        false, true);
140:                String uuid = ContextUtils.generateUUID();
141:                maps.setMessageID(ContextUtils.getAttributedURI(uuid));
142:            }
143:
144:            /**
145:             * Refresh the RM Properties with an AckRequested if necessary.
146:             * Currently the first resend for each sequence on each initiator iteration
147:             * includes an AckRequested. The idea is that a timely ACK may cause some of
148:             * of the resend to be avoided.
149:             * 
150:             * @param context the message context
151:             * @param requestAcknowledge true if an AckRequested header should be included 
152:             */
153:            private void refreshRMProperties(MessageContext context,
154:                    boolean requestAcknowledge) {
155:                RMProperties properties = RMContextUtils.retrieveRMProperties(
156:                        context, true);
157:                List<AckRequestedType> requests = null;
158:                if (requestAcknowledge) {
159:                    requests = new ArrayList<AckRequestedType>();
160:                    requests.add(RMUtils.getWSRMFactory()
161:                            .createAckRequestedType());
162:                    Identifier id = properties.getSequence().getIdentifier();
163:                    requests.get(0).setIdentifier(id);
164:                }
165:                properties.setAcksRequested(requests);
166:            }
167:
168:            /**
169:             * Create a client request for retransmission.
170:             * 
171:             * @param context the message context
172:             * @return an appropriate Request for the context
173:             */
174:            private Request createClientRequest(ObjectMessageContext context) {
175:                AbstractBindingBase binding = handler.getBinding();
176:                Transport transport = handler.getClientTransport();
177:                Request request = new Request(binding, transport, context);
178:                request.setOneway(ContextUtils.isOneway(context));
179:                return request;
180:            }
181:
182:            /**
183:             * Client-side resend.
184:             * 
185:             * @param context the message context
186:             */
187:            private void clientResend(ObjectMessageContext context)
188:                    throws IOException {
189:                Request request = createClientRequest(context);
190:                OutputStreamMessageContext outputStreamContext = request
191:                        .process(null, true, true);
192:                ClientTransport transport = handler.getClientTransport();
193:                if (transport != null) {
194:                    // decoupled response channel always being used with RM, 
195:                    // hence a partial response must be processed
196:                    invokePartial(request, transport, outputStreamContext);
197:                } else {
198:                    LOG.log(Level.WARNING, "NO_TRANSPORT_FOR_RESEND_MSG");
199:                }
200:            }
201:
202:            /**
203:             * Create a server request for retransmission.
204:             * 
205:             * @param context the message context
206:             * @return an appropriate ServerRequest for the context
207:             */
208:            private ServerRequest createServerRequest(
209:                    ObjectMessageContext context) {
210:                AbstractBindingBase binding = handler.getBinding();
211:                ServerRequest request = new ServerRequest(binding, context);
212:                // a server-originated resend implies a response, hence non-oneway
213:                request.setOneway(false);
214:                return request;
215:            }
216:
217:            /**
218:             * Server-side resend.
219:             * 
220:             * @param context the message context
221:             */
222:            private void serverResend(ObjectMessageContext context)
223:                    throws IOException {
224:                ServerTransport transport = handler.getServerTransport();
225:                if (transport != null) {
226:                    ServerRequest serverRequest = createServerRequest(context);
227:                    serverRequest.processOutbound(transport, null, true);
228:                } else {
229:                    LOG.log(Level.WARNING, "NO_TRANSPORT_FOR_RESEND_MSG");
230:                }
231:            }
232:
233:            /**
234:             * Invoke a oneway operation, allowing for a partial response.
235:             * 
236:             * @param request the request
237:             * @param transport the client transport 
238:             * @param outputStreamContext the output stream message context
239:             */
240:            private void invokePartial(Request request,
241:                    ClientTransport transport,
242:                    OutputStreamMessageContext outputStreamContext)
243:                    throws IOException {
244:                InputStreamMessageContext inputStreamContext = transport
245:                        .invoke(outputStreamContext);
246:                Response response = new Response(request);
247:                response.processProtocol(inputStreamContext);
248:                response.processLogical(null);
249:            }
250:
251:            /**
252:             * Populates the retransmission queue with messages recovered from persistent
253:             * store.
254:             *
255:             */
256:            protected void populate(Collection<SourceSequence> seqs) {
257:                LOG.fine(seqs.size() + " active sequences");
258:                RMStore store = handler.getStore();
259:                for (SourceSequence seq : seqs) {
260:                    Collection<RMMessage> msgs = store.getMessages(seq
261:                            .getIdentifier(), true);
262:                    LOG.fine("Recovered " + msgs.size()
263:                            + " messages for this sequence");
264:                    for (RMMessage msg : msgs) {
265:                        ObjectMessageContext objCtx = new ObjectMessageContextImpl();
266:                        objCtx.putAll(msg.getContext());
267:                        cacheUnacknowledged(objCtx);
268:                        LOG.fine("cached unacknowledged message nr: "
269:                                + msg.getMessageNr());
270:                    }
271:                }
272:            }
273:
274:            protected RMSoapHandler getRMSoapHandler() {
275:                if (null == rmSOAPHandler) {
276:                    AbstractBindingImpl abi = handler.getBinding()
277:                            .getBindingImpl();
278:                    List<Handler> handlerChain = abi
279:                            .getPostProtocolSystemHandlers();
280:                    for (Handler h : handlerChain) {
281:                        if (h instanceof  RMSoapHandler) {
282:                            rmSOAPHandler = (RMSoapHandler) h;
283:                        }
284:                    }
285:                }
286:                return rmSOAPHandler;
287:            }
288:
289:            protected MAPCodec getWsaSOAPHandler() {
290:                if (null == wsaSOAPHandler) {
291:                    AbstractBindingImpl abi = handler.getBinding()
292:                            .getBindingImpl();
293:                    List<Handler> handlerChain = abi
294:                            .getPostProtocolSystemHandlers();
295:                    for (Handler h : handlerChain) {
296:                        if (h instanceof  MAPCodec) {
297:                            wsaSOAPHandler = (MAPCodec) h;
298:                        }
299:                    }
300:                }
301:                return wsaSOAPHandler;
302:            }
303:
304:            /**
305:             * Plug in replacement resend logic (facilitates unit testing).
306:             *  
307:             * @param replacement resend logic
308:             */
309:            protected void replaceResender(Resender replacement) {
310:                resender = replacement;
311:            }
312:
313:            /**
314:             * Initiate resends.
315:             * 
316:             * @param queue the work queue providing async execution
317:             */
318:            protected void start(WorkQueue queue) {
319:                if (null == workQueue) {
320:                    LOG.fine("Starting retransmission queue");
321:                    workQueue = queue;
322:                    // workQueue.schedule(getResendInitiator(), baseRetransmissionInterval); 
323:
324:                    TimerTask task = new TimerTask() {
325:                        public void run() {
326:                            getResendInitiator().run();
327:                        }
328:                    };
329:                    timer = new Timer();
330:                    timer.schedule(task, getBaseRetransmissionInterval(),
331:                            getBaseRetransmissionInterval());
332:                }
333:            }
334:
335:            protected void stop() {
336:                if (null != timer) {
337:                    LOG.fine("Stopping retransmission queue");
338:                    timer.cancel();
339:                }
340:            }
341:
342:            /**
343:             * Accepts a new resend candidate.
344:             * 
345:             * @param ctx the message context.
346:             * @return ResendCandidate
347:             */
348:            protected ResendCandidate cacheUnacknowledged(
349:                    ObjectMessageContext ctx) {
350:                ResendCandidate candidate = null;
351:                RMProperties rmps = RMContextUtils.retrieveRMProperties(ctx,
352:                        true);
353:                if (null == rmps) {
354:                    SOAPMessage message = (SOAPMessage) ctx.get(SOAP_MSG_KEY);
355:                    rmps = getRMSoapHandler().unmarshalRMProperties(message);
356:                    RMContextUtils.storeRMProperties(ctx, rmps, true);
357:                }
358:                AddressingProperties maps = ContextUtils.retrieveMAPs(ctx,
359:                        false, true);
360:                if (null == maps) {
361:                    SOAPMessage message = (SOAPMessage) ctx.get(SOAP_MSG_KEY);
362:                    try {
363:                        maps = getWsaSOAPHandler().unmarshalMAPs(message);
364:                        ContextUtils.storeMAPs(maps, ctx, true);
365:                    } catch (Exception ex) {
366:                        ex.printStackTrace();
367:                    }
368:                }
369:
370:                SequenceType st = rmps.getSequence();
371:                Identifier sid = st.getIdentifier();
372:                synchronized (this ) {
373:                    String key = sid.getValue();
374:                    List<ResendCandidate> sequenceCandidates = getSequenceCandidates(key);
375:                    if (null == sequenceCandidates) {
376:                        sequenceCandidates = new ArrayList<ResendCandidate>();
377:                        candidates.put(key, sequenceCandidates);
378:                    }
379:                    candidate = new ResendCandidate(ctx);
380:                    sequenceCandidates.add(candidate);
381:                }
382:                return candidate;
383:            }
384:
385:            /**
386:             * Purge all candidates for the given sequence that
387:             * have been acknowledged.
388:             * 
389:             * @param seq the sequence object.
390:             */
391:            protected void purgeAcknowledged(SourceSequence seq) {
392:                Collection<BigInteger> purged = new ArrayList<BigInteger>();
393:                synchronized (this ) {
394:                    List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
395:                    if (null != sequenceCandidates) {
396:                        for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
397:                            ResendCandidate candidate = sequenceCandidates
398:                                    .get(i);
399:                            RMProperties properties = RMContextUtils
400:                                    .retrieveRMProperties(candidate
401:                                            .getContext(), true);
402:                            SequenceType st = properties.getSequence();
403:                            BigInteger m = st.getMessageNumber();
404:                            if (seq.isAcknowledged(m)) {
405:                                sequenceCandidates.remove(i);
406:                                candidate.resolved();
407:                                purged.add(m);
408:                            }
409:                        }
410:                    }
411:                }
412:                if (purged.size() > 0) {
413:                    handler.getStore().removeMessages(seq.getIdentifier(),
414:                            purged, true);
415:                }
416:            }
417:
418:            /**
419:             * @param seq the sequence under consideration
420:             * @return the number of unacknowledged messages for that sequence
421:             */
422:            protected synchronized int countUnacknowledged(SourceSequence seq) {
423:                List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
424:                return sequenceCandidates == null ? 0 : sequenceCandidates
425:                        .size();
426:            }
427:
428:            /**
429:             * @return a map relating sequence ID to a lists of un-acknowledged 
430:             * messages for that sequence
431:             */
432:            protected Map<String, List<ResendCandidate>> getUnacknowledged() {
433:                return candidates;
434:            }
435:
436:            /**
437:             * @param seq the sequence under consideration
438:             * @return the list of resend candidates for that sequence
439:             * @pre called with mutex held
440:             */
441:            protected List<ResendCandidate> getSequenceCandidates(
442:                    SourceSequence seq) {
443:                return getSequenceCandidates(seq.getIdentifier().getValue());
444:            }
445:
446:            /**
447:             * @param key the sequence identifier under consideration
448:             * @return the list of resend candidates for that sequence
449:             * @pre called with mutex held
450:             */
451:            protected List<ResendCandidate> getSequenceCandidates(String key) {
452:                return candidates.get(key);
453:            }
454:
455:            /**
456:             * @return the base retransmission interval
457:             */
458:            protected long getBaseRetransmissionInterval() {
459:                return baseRetransmissionInterval;
460:            }
461:
462:            /**
463:             * @return the exponential backoff
464:             */
465:            protected int getExponentialBackoff() {
466:                return exponentialBackoff;
467:            }
468:
469:            /**
470:             * Shutdown.
471:             */
472:            protected synchronized void shutdown() {
473:                shutdown = true;
474:            }
475:
476:            /**
477:             * @return true if shutdown
478:             */
479:            protected synchronized boolean isShutdown() {
480:                return shutdown;
481:            }
482:
483:            /**
484:             * @return the ResendInitiator
485:             */
486:            protected Runnable getResendInitiator() {
487:                if (resendInitiator == null) {
488:                    resendInitiator = new ResendInitiator();
489:                }
490:                return resendInitiator;
491:            }
492:
493:            /**
494:             * @param context the message context 
495:             * @return a ResendCandidate
496:             */
497:            protected ResendCandidate createResendCandidate(
498:                    ObjectMessageContext context) {
499:                return new ResendCandidate(context);
500:            }
501:
502:            /**
503:             * Manages scheduling of resend attempts.
504:             * A single task runs every base transmission interval,
505:             * determining which resend candidates are due a resend attempt.
506:             */
507:            protected class ResendInitiator implements  Runnable {
508:                public void run() {
509:                    // iterate over resend candidates, resending any that are due
510:                    synchronized (RetransmissionQueue.this ) {
511:                        Iterator<Map.Entry<String, List<ResendCandidate>>> sequences = candidates
512:                                .entrySet().iterator();
513:                        while (sequences.hasNext()) {
514:                            Iterator<ResendCandidate> sequenceCandidates = sequences
515:                                    .next().getValue().iterator();
516:                            boolean requestAck = true;
517:                            while (sequenceCandidates.hasNext()) {
518:                                ResendCandidate candidate = sequenceCandidates
519:                                        .next();
520:                                if (candidate.isDue()) {
521:                                    candidate.initiate(requestAck);
522:                                    requestAck = false;
523:                                }
524:                            }
525:                        }
526:                    }
527:                    /*
528:                    if (!isShutdown()) {
529:                        // schedule next resend initiation task (rescheduling each time,
530:                        // as opposed to scheduling a periodic task, eliminates the
531:                        // potential for simultaneous execution)
532:                        workQueue.schedule(this, getBaseRetransmissionInterval());
533:                    }
534:                     */
535:                }
536:            }
537:
538:            /**
539:             * Represents a candidate for resend, i.e. an unacked outgoing message.
540:             * When this is determined as due another resend attempt, an asynchronous
541:             * task is scheduled for this purpose.
542:             */
543:            protected class ResendCandidate implements  Runnable {
544:                private ObjectMessageContext context;
545:                private int skips;
546:                private int skipped;
547:                private boolean pending;
548:                private boolean includeAckRequested;
549:
550:                /**
551:                 * @param ctx message context for the unacked message
552:                 */
553:                protected ResendCandidate(ObjectMessageContext ctx) {
554:                    context = ctx;
555:                    skipped = -1;
556:                    skips = 1;
557:                }
558:
559:                /**
560:                 * Async resend logic.
561:                 */
562:                public void run() {
563:                    try {
564:                        // ensure ACK wasn't received while this task was enqueued
565:                        // on executor
566:                        if (isPending()) {
567:                            resender.resend(context, includeAckRequested);
568:                            includeAckRequested = false;
569:                        }
570:                    } finally {
571:                        attempted();
572:                    }
573:                }
574:
575:                /**
576:                 * @return true if candidate is due a resend
577:                 * REVISIT should bound the max number of resend attampts
578:                 */
579:                protected synchronized boolean isDue() {
580:                    boolean due = false;
581:                    // skip count is used to model exponential backoff
582:                    // to avoid gratuitous time evaluation
583:                    if (!pending && ++skipped == skips) {
584:                        skips *= getExponentialBackoff();
585:                        skipped = 0;
586:                        due = true;
587:                    }
588:                    return due;
589:                }
590:
591:                /**
592:                 * @return if resend attempt is pending
593:                 */
594:                protected synchronized boolean isPending() {
595:                    return pending;
596:                }
597:
598:                /**
599:                 * Initiate resend asynchronsly.
600:                 * 
601:                 * @param requestAcknowledge true if a AckRequest header is to be sent with
602:                 * resend
603:                 */
604:                protected synchronized void initiate(boolean requestAcknowledge) {
605:                    includeAckRequested = requestAcknowledge;
606:                    pending = true;
607:                    workQueue.execute(this );
608:                }
609:
610:                /**
611:                 * ACK has been received for this candidate.
612:                 */
613:                protected synchronized void resolved() {
614:                    pending = false;
615:                    skips = Integer.MAX_VALUE;
616:                }
617:
618:                /**
619:                 * @return associated message context
620:                 */
621:                protected MessageContext getContext() {
622:                    return context;
623:                }
624:
625:                /**
626:                 * A resend has been attempted.
627:                 */
628:                private synchronized void attempted() {
629:                    pending = false;
630:                }
631:            }
632:
633:            /**
634:             * Encapsulates actual resend logic (pluggable to facilitate unit testing)
635:             */
636:            public interface Resender {
637:                /**
638:                 * Resend mechanics.
639:                 * 
640:                 * @param context the cloned message context.
641:                 * @param if a AckRequest should be included
642:                 */
643:                void resend(ObjectMessageContext context,
644:                        boolean requestAcknowledge);
645:            }
646:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.