001: package org.objectweb.celtix.bus.ws.rm;
002:
003: import java.io.IOException;
004: import java.math.BigInteger;
005: import java.util.ArrayList;
006: import java.util.Collection;
007: import java.util.HashMap;
008: import java.util.Iterator;
009: import java.util.List;
010: import java.util.Map;
011: import java.util.Timer;
012: import java.util.TimerTask;
013: import java.util.logging.Level;
014: import java.util.logging.Logger;
015:
016: import javax.xml.namespace.QName;
017: import javax.xml.soap.SOAPMessage;
018: import javax.xml.ws.handler.Handler;
019: import javax.xml.ws.handler.MessageContext;
020:
021: import org.objectweb.celtix.bindings.AbstractBindingBase;
022: import org.objectweb.celtix.bindings.AbstractBindingImpl;
023: import org.objectweb.celtix.bindings.Request;
024: import org.objectweb.celtix.bindings.Response;
025: import org.objectweb.celtix.bindings.ServerRequest;
026: import org.objectweb.celtix.bus.ws.addressing.ContextUtils;
027: import org.objectweb.celtix.bus.ws.addressing.soap.MAPCodec;
028: import org.objectweb.celtix.bus.ws.rm.soap.RMSoapHandler;
029: import org.objectweb.celtix.common.logging.LogUtils;
030: import org.objectweb.celtix.context.InputStreamMessageContext;
031: import org.objectweb.celtix.context.ObjectMessageContext;
032: import org.objectweb.celtix.context.ObjectMessageContextImpl;
033: import org.objectweb.celtix.context.OutputStreamMessageContext;
034: import org.objectweb.celtix.transports.ClientTransport;
035: import org.objectweb.celtix.transports.ServerTransport;
036: import org.objectweb.celtix.transports.Transport;
037: import org.objectweb.celtix.workqueue.WorkQueue;
038: import org.objectweb.celtix.ws.addressing.AddressingProperties;
039: import org.objectweb.celtix.ws.rm.AckRequestedType;
040: import org.objectweb.celtix.ws.rm.Identifier;
041: import org.objectweb.celtix.ws.rm.RMProperties;
042: import org.objectweb.celtix.ws.rm.SequenceType;
043: import org.objectweb.celtix.ws.rm.persistence.RMMessage;
044: import org.objectweb.celtix.ws.rm.persistence.RMStore;
045: import org.objectweb.celtix.ws.rm.policy.RMAssertionType;
046:
047: public class RetransmissionQueue {
048: public static final QName EXPONENTIAL_BACKOFF_BASE_ATTR = new QName(
049: RMHandler.RM_CONFIGURATION_URI, "exponentialBackoffBase");
050: public static final String DEFAULT_BASE_RETRANSMISSION_INTERVAL = "3000";
051: public static final String DEFAULT_EXPONENTIAL_BACKOFF = "2";
052: private static final String SOAP_MSG_KEY = "org.objectweb.celtix.bindings.soap.message";
053: private static final Logger LOG = LogUtils
054: .getL7dLogger(RetransmissionQueue.class);
055:
056: private RMHandler handler;
057: private RMSoapHandler rmSOAPHandler;
058: private MAPCodec wsaSOAPHandler;
059: private WorkQueue workQueue;
060: private long baseRetransmissionInterval;
061: private int exponentialBackoff;
062: private Map<String, List<ResendCandidate>> candidates;
063: private Runnable resendInitiator;
064: private boolean shutdown;
065: private Resender resender;
066: private Timer timer;
067:
068: /**
069: * Constructor.
070: */
071: public RetransmissionQueue(RMHandler h) {
072: this (h, Long.parseLong(DEFAULT_BASE_RETRANSMISSION_INTERVAL),
073: Integer.parseInt(DEFAULT_EXPONENTIAL_BACKOFF));
074: }
075:
076: /**
077: * Constructor.
078: */
079: public RetransmissionQueue(RMHandler h, RMAssertionType rma) {
080: this (h, rma.getBaseRetransmissionInterval().getMilliseconds()
081: .longValue(), Integer.parseInt(rma
082: .getExponentialBackoff().getOtherAttributes().get(
083: EXPONENTIAL_BACKOFF_BASE_ATTR)));
084: }
085:
086: /**
087: * Constructor.
088: *
089: * @param base the base retransmission interval
090: * @param backoff the exponential backoff
091: */
092: public RetransmissionQueue(RMHandler h, long base, int backoff) {
093: handler = h;
094: baseRetransmissionInterval = base;
095: exponentialBackoff = backoff;
096: candidates = new HashMap<String, List<ResendCandidate>>();
097: resender = getDefaultResender();
098: }
099:
100: /**
101: * Create default Resender logic.
102: *
103: * @return default Resender
104: */
105: protected final Resender getDefaultResender() {
106: return new Resender() {
107: public void resend(ObjectMessageContext context,
108: boolean requestAcknowledge) {
109: RMProperties properties = RMContextUtils
110: .retrieveRMProperties(context, true);
111: SequenceType st = properties.getSequence();
112: if (st != null) {
113: LOG.log(Level.INFO, "RESEND_MSG", st
114: .getMessageNumber());
115: }
116: try {
117: refreshMAPs(context);
118: refreshRMProperties(context, requestAcknowledge);
119: if (ContextUtils.isRequestor(context)) {
120: clientResend(context);
121: } else {
122: serverResend(context);
123: }
124: } catch (Exception e) {
125: LOG.log(Level.WARNING, "RESEND_FAILED_MSG", e);
126: }
127: }
128: };
129: };
130:
131: /**
132: * Refresh the MAPs with a new message ID (to avoid the resend being
133: * rejected by the receiver-side WS-Addressing layer as a duplicate).
134: *
135: * @param context the message context
136: */
137: private void refreshMAPs(MessageContext context) {
138: AddressingProperties maps = ContextUtils.retrieveMAPs(context,
139: false, true);
140: String uuid = ContextUtils.generateUUID();
141: maps.setMessageID(ContextUtils.getAttributedURI(uuid));
142: }
143:
144: /**
145: * Refresh the RM Properties with an AckRequested if necessary.
146: * Currently the first resend for each sequence on each initiator iteration
147: * includes an AckRequested. The idea is that a timely ACK may cause some of
148: * of the resend to be avoided.
149: *
150: * @param context the message context
151: * @param requestAcknowledge true if an AckRequested header should be included
152: */
153: private void refreshRMProperties(MessageContext context,
154: boolean requestAcknowledge) {
155: RMProperties properties = RMContextUtils.retrieveRMProperties(
156: context, true);
157: List<AckRequestedType> requests = null;
158: if (requestAcknowledge) {
159: requests = new ArrayList<AckRequestedType>();
160: requests.add(RMUtils.getWSRMFactory()
161: .createAckRequestedType());
162: Identifier id = properties.getSequence().getIdentifier();
163: requests.get(0).setIdentifier(id);
164: }
165: properties.setAcksRequested(requests);
166: }
167:
168: /**
169: * Create a client request for retransmission.
170: *
171: * @param context the message context
172: * @return an appropriate Request for the context
173: */
174: private Request createClientRequest(ObjectMessageContext context) {
175: AbstractBindingBase binding = handler.getBinding();
176: Transport transport = handler.getClientTransport();
177: Request request = new Request(binding, transport, context);
178: request.setOneway(ContextUtils.isOneway(context));
179: return request;
180: }
181:
182: /**
183: * Client-side resend.
184: *
185: * @param context the message context
186: */
187: private void clientResend(ObjectMessageContext context)
188: throws IOException {
189: Request request = createClientRequest(context);
190: OutputStreamMessageContext outputStreamContext = request
191: .process(null, true, true);
192: ClientTransport transport = handler.getClientTransport();
193: if (transport != null) {
194: // decoupled response channel always being used with RM,
195: // hence a partial response must be processed
196: invokePartial(request, transport, outputStreamContext);
197: } else {
198: LOG.log(Level.WARNING, "NO_TRANSPORT_FOR_RESEND_MSG");
199: }
200: }
201:
202: /**
203: * Create a server request for retransmission.
204: *
205: * @param context the message context
206: * @return an appropriate ServerRequest for the context
207: */
208: private ServerRequest createServerRequest(
209: ObjectMessageContext context) {
210: AbstractBindingBase binding = handler.getBinding();
211: ServerRequest request = new ServerRequest(binding, context);
212: // a server-originated resend implies a response, hence non-oneway
213: request.setOneway(false);
214: return request;
215: }
216:
217: /**
218: * Server-side resend.
219: *
220: * @param context the message context
221: */
222: private void serverResend(ObjectMessageContext context)
223: throws IOException {
224: ServerTransport transport = handler.getServerTransport();
225: if (transport != null) {
226: ServerRequest serverRequest = createServerRequest(context);
227: serverRequest.processOutbound(transport, null, true);
228: } else {
229: LOG.log(Level.WARNING, "NO_TRANSPORT_FOR_RESEND_MSG");
230: }
231: }
232:
233: /**
234: * Invoke a oneway operation, allowing for a partial response.
235: *
236: * @param request the request
237: * @param transport the client transport
238: * @param outputStreamContext the output stream message context
239: */
240: private void invokePartial(Request request,
241: ClientTransport transport,
242: OutputStreamMessageContext outputStreamContext)
243: throws IOException {
244: InputStreamMessageContext inputStreamContext = transport
245: .invoke(outputStreamContext);
246: Response response = new Response(request);
247: response.processProtocol(inputStreamContext);
248: response.processLogical(null);
249: }
250:
251: /**
252: * Populates the retransmission queue with messages recovered from persistent
253: * store.
254: *
255: */
256: protected void populate(Collection<SourceSequence> seqs) {
257: LOG.fine(seqs.size() + " active sequences");
258: RMStore store = handler.getStore();
259: for (SourceSequence seq : seqs) {
260: Collection<RMMessage> msgs = store.getMessages(seq
261: .getIdentifier(), true);
262: LOG.fine("Recovered " + msgs.size()
263: + " messages for this sequence");
264: for (RMMessage msg : msgs) {
265: ObjectMessageContext objCtx = new ObjectMessageContextImpl();
266: objCtx.putAll(msg.getContext());
267: cacheUnacknowledged(objCtx);
268: LOG.fine("cached unacknowledged message nr: "
269: + msg.getMessageNr());
270: }
271: }
272: }
273:
274: protected RMSoapHandler getRMSoapHandler() {
275: if (null == rmSOAPHandler) {
276: AbstractBindingImpl abi = handler.getBinding()
277: .getBindingImpl();
278: List<Handler> handlerChain = abi
279: .getPostProtocolSystemHandlers();
280: for (Handler h : handlerChain) {
281: if (h instanceof RMSoapHandler) {
282: rmSOAPHandler = (RMSoapHandler) h;
283: }
284: }
285: }
286: return rmSOAPHandler;
287: }
288:
289: protected MAPCodec getWsaSOAPHandler() {
290: if (null == wsaSOAPHandler) {
291: AbstractBindingImpl abi = handler.getBinding()
292: .getBindingImpl();
293: List<Handler> handlerChain = abi
294: .getPostProtocolSystemHandlers();
295: for (Handler h : handlerChain) {
296: if (h instanceof MAPCodec) {
297: wsaSOAPHandler = (MAPCodec) h;
298: }
299: }
300: }
301: return wsaSOAPHandler;
302: }
303:
304: /**
305: * Plug in replacement resend logic (facilitates unit testing).
306: *
307: * @param replacement resend logic
308: */
309: protected void replaceResender(Resender replacement) {
310: resender = replacement;
311: }
312:
313: /**
314: * Initiate resends.
315: *
316: * @param queue the work queue providing async execution
317: */
318: protected void start(WorkQueue queue) {
319: if (null == workQueue) {
320: LOG.fine("Starting retransmission queue");
321: workQueue = queue;
322: // workQueue.schedule(getResendInitiator(), baseRetransmissionInterval);
323:
324: TimerTask task = new TimerTask() {
325: public void run() {
326: getResendInitiator().run();
327: }
328: };
329: timer = new Timer();
330: timer.schedule(task, getBaseRetransmissionInterval(),
331: getBaseRetransmissionInterval());
332: }
333: }
334:
335: protected void stop() {
336: if (null != timer) {
337: LOG.fine("Stopping retransmission queue");
338: timer.cancel();
339: }
340: }
341:
342: /**
343: * Accepts a new resend candidate.
344: *
345: * @param ctx the message context.
346: * @return ResendCandidate
347: */
348: protected ResendCandidate cacheUnacknowledged(
349: ObjectMessageContext ctx) {
350: ResendCandidate candidate = null;
351: RMProperties rmps = RMContextUtils.retrieveRMProperties(ctx,
352: true);
353: if (null == rmps) {
354: SOAPMessage message = (SOAPMessage) ctx.get(SOAP_MSG_KEY);
355: rmps = getRMSoapHandler().unmarshalRMProperties(message);
356: RMContextUtils.storeRMProperties(ctx, rmps, true);
357: }
358: AddressingProperties maps = ContextUtils.retrieveMAPs(ctx,
359: false, true);
360: if (null == maps) {
361: SOAPMessage message = (SOAPMessage) ctx.get(SOAP_MSG_KEY);
362: try {
363: maps = getWsaSOAPHandler().unmarshalMAPs(message);
364: ContextUtils.storeMAPs(maps, ctx, true);
365: } catch (Exception ex) {
366: ex.printStackTrace();
367: }
368: }
369:
370: SequenceType st = rmps.getSequence();
371: Identifier sid = st.getIdentifier();
372: synchronized (this ) {
373: String key = sid.getValue();
374: List<ResendCandidate> sequenceCandidates = getSequenceCandidates(key);
375: if (null == sequenceCandidates) {
376: sequenceCandidates = new ArrayList<ResendCandidate>();
377: candidates.put(key, sequenceCandidates);
378: }
379: candidate = new ResendCandidate(ctx);
380: sequenceCandidates.add(candidate);
381: }
382: return candidate;
383: }
384:
385: /**
386: * Purge all candidates for the given sequence that
387: * have been acknowledged.
388: *
389: * @param seq the sequence object.
390: */
391: protected void purgeAcknowledged(SourceSequence seq) {
392: Collection<BigInteger> purged = new ArrayList<BigInteger>();
393: synchronized (this ) {
394: List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
395: if (null != sequenceCandidates) {
396: for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
397: ResendCandidate candidate = sequenceCandidates
398: .get(i);
399: RMProperties properties = RMContextUtils
400: .retrieveRMProperties(candidate
401: .getContext(), true);
402: SequenceType st = properties.getSequence();
403: BigInteger m = st.getMessageNumber();
404: if (seq.isAcknowledged(m)) {
405: sequenceCandidates.remove(i);
406: candidate.resolved();
407: purged.add(m);
408: }
409: }
410: }
411: }
412: if (purged.size() > 0) {
413: handler.getStore().removeMessages(seq.getIdentifier(),
414: purged, true);
415: }
416: }
417:
418: /**
419: * @param seq the sequence under consideration
420: * @return the number of unacknowledged messages for that sequence
421: */
422: protected synchronized int countUnacknowledged(SourceSequence seq) {
423: List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
424: return sequenceCandidates == null ? 0 : sequenceCandidates
425: .size();
426: }
427:
428: /**
429: * @return a map relating sequence ID to a lists of un-acknowledged
430: * messages for that sequence
431: */
432: protected Map<String, List<ResendCandidate>> getUnacknowledged() {
433: return candidates;
434: }
435:
436: /**
437: * @param seq the sequence under consideration
438: * @return the list of resend candidates for that sequence
439: * @pre called with mutex held
440: */
441: protected List<ResendCandidate> getSequenceCandidates(
442: SourceSequence seq) {
443: return getSequenceCandidates(seq.getIdentifier().getValue());
444: }
445:
446: /**
447: * @param key the sequence identifier under consideration
448: * @return the list of resend candidates for that sequence
449: * @pre called with mutex held
450: */
451: protected List<ResendCandidate> getSequenceCandidates(String key) {
452: return candidates.get(key);
453: }
454:
455: /**
456: * @return the base retransmission interval
457: */
458: protected long getBaseRetransmissionInterval() {
459: return baseRetransmissionInterval;
460: }
461:
462: /**
463: * @return the exponential backoff
464: */
465: protected int getExponentialBackoff() {
466: return exponentialBackoff;
467: }
468:
469: /**
470: * Shutdown.
471: */
472: protected synchronized void shutdown() {
473: shutdown = true;
474: }
475:
476: /**
477: * @return true if shutdown
478: */
479: protected synchronized boolean isShutdown() {
480: return shutdown;
481: }
482:
483: /**
484: * @return the ResendInitiator
485: */
486: protected Runnable getResendInitiator() {
487: if (resendInitiator == null) {
488: resendInitiator = new ResendInitiator();
489: }
490: return resendInitiator;
491: }
492:
493: /**
494: * @param context the message context
495: * @return a ResendCandidate
496: */
497: protected ResendCandidate createResendCandidate(
498: ObjectMessageContext context) {
499: return new ResendCandidate(context);
500: }
501:
502: /**
503: * Manages scheduling of resend attempts.
504: * A single task runs every base transmission interval,
505: * determining which resend candidates are due a resend attempt.
506: */
507: protected class ResendInitiator implements Runnable {
508: public void run() {
509: // iterate over resend candidates, resending any that are due
510: synchronized (RetransmissionQueue.this ) {
511: Iterator<Map.Entry<String, List<ResendCandidate>>> sequences = candidates
512: .entrySet().iterator();
513: while (sequences.hasNext()) {
514: Iterator<ResendCandidate> sequenceCandidates = sequences
515: .next().getValue().iterator();
516: boolean requestAck = true;
517: while (sequenceCandidates.hasNext()) {
518: ResendCandidate candidate = sequenceCandidates
519: .next();
520: if (candidate.isDue()) {
521: candidate.initiate(requestAck);
522: requestAck = false;
523: }
524: }
525: }
526: }
527: /*
528: if (!isShutdown()) {
529: // schedule next resend initiation task (rescheduling each time,
530: // as opposed to scheduling a periodic task, eliminates the
531: // potential for simultaneous execution)
532: workQueue.schedule(this, getBaseRetransmissionInterval());
533: }
534: */
535: }
536: }
537:
538: /**
539: * Represents a candidate for resend, i.e. an unacked outgoing message.
540: * When this is determined as due another resend attempt, an asynchronous
541: * task is scheduled for this purpose.
542: */
543: protected class ResendCandidate implements Runnable {
544: private ObjectMessageContext context;
545: private int skips;
546: private int skipped;
547: private boolean pending;
548: private boolean includeAckRequested;
549:
550: /**
551: * @param ctx message context for the unacked message
552: */
553: protected ResendCandidate(ObjectMessageContext ctx) {
554: context = ctx;
555: skipped = -1;
556: skips = 1;
557: }
558:
559: /**
560: * Async resend logic.
561: */
562: public void run() {
563: try {
564: // ensure ACK wasn't received while this task was enqueued
565: // on executor
566: if (isPending()) {
567: resender.resend(context, includeAckRequested);
568: includeAckRequested = false;
569: }
570: } finally {
571: attempted();
572: }
573: }
574:
575: /**
576: * @return true if candidate is due a resend
577: * REVISIT should bound the max number of resend attampts
578: */
579: protected synchronized boolean isDue() {
580: boolean due = false;
581: // skip count is used to model exponential backoff
582: // to avoid gratuitous time evaluation
583: if (!pending && ++skipped == skips) {
584: skips *= getExponentialBackoff();
585: skipped = 0;
586: due = true;
587: }
588: return due;
589: }
590:
591: /**
592: * @return if resend attempt is pending
593: */
594: protected synchronized boolean isPending() {
595: return pending;
596: }
597:
598: /**
599: * Initiate resend asynchronsly.
600: *
601: * @param requestAcknowledge true if a AckRequest header is to be sent with
602: * resend
603: */
604: protected synchronized void initiate(boolean requestAcknowledge) {
605: includeAckRequested = requestAcknowledge;
606: pending = true;
607: workQueue.execute(this );
608: }
609:
610: /**
611: * ACK has been received for this candidate.
612: */
613: protected synchronized void resolved() {
614: pending = false;
615: skips = Integer.MAX_VALUE;
616: }
617:
618: /**
619: * @return associated message context
620: */
621: protected MessageContext getContext() {
622: return context;
623: }
624:
625: /**
626: * A resend has been attempted.
627: */
628: private synchronized void attempted() {
629: pending = false;
630: }
631: }
632:
633: /**
634: * Encapsulates actual resend logic (pluggable to facilitate unit testing)
635: */
636: public interface Resender {
637: /**
638: * Resend mechanics.
639: *
640: * @param context the cloned message context.
641: * @param if a AckRequest should be included
642: */
643: void resend(ObjectMessageContext context,
644: boolean requestAcknowledge);
645: }
646: }
|