001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm.soap;
019:
020: import java.io.ByteArrayInputStream;
021: import java.io.ByteArrayOutputStream;
022: import java.io.IOException;
023: import java.io.OutputStream;
024: import java.math.BigInteger;
025: import java.util.ArrayList;
026: import java.util.Collection;
027: import java.util.Date;
028: import java.util.HashMap;
029: import java.util.List;
030: import java.util.Map;
031: import java.util.TimerTask;
032: import java.util.concurrent.Executor;
033: import java.util.concurrent.RejectedExecutionException;
034: import java.util.logging.Level;
035: import java.util.logging.Logger;
036:
037: import org.apache.cxf.common.logging.LogUtils;
038: import org.apache.cxf.endpoint.ConduitSelector;
039: import org.apache.cxf.endpoint.DeferredConduitSelector;
040: import org.apache.cxf.endpoint.Endpoint;
041: import org.apache.cxf.io.CachedOutputStream;
042: import org.apache.cxf.io.CachedOutputStreamCallback;
043: import org.apache.cxf.message.Message;
044: import org.apache.cxf.message.MessageUtils;
045: import org.apache.cxf.service.model.EndpointInfo;
046: import org.apache.cxf.transport.Conduit;
047: import org.apache.cxf.transport.MessageObserver;
048: import org.apache.cxf.ws.addressing.AddressingProperties;
049: import org.apache.cxf.ws.addressing.AttributedURIType;
050: import org.apache.cxf.ws.policy.AssertionInfo;
051: import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
052: import org.apache.cxf.ws.rm.Identifier;
053: import org.apache.cxf.ws.rm.RMContextUtils;
054: import org.apache.cxf.ws.rm.RMManager;
055: import org.apache.cxf.ws.rm.RMMessageConstants;
056: import org.apache.cxf.ws.rm.RMProperties;
057: import org.apache.cxf.ws.rm.RMUtils;
058: import org.apache.cxf.ws.rm.RetransmissionCallback;
059: import org.apache.cxf.ws.rm.RetransmissionQueue;
060: import org.apache.cxf.ws.rm.SequenceType;
061: import org.apache.cxf.ws.rm.SourceSequence;
062: import org.apache.cxf.ws.rm.persistence.RMStore;
063: import org.apache.cxf.ws.rm.policy.PolicyUtils;
064: import org.apache.cxf.ws.rm.policy.RMAssertion;
065:
066: /**
067: *
068: */
069: public class RetransmissionQueueImpl implements RetransmissionQueue {
070:
071: private static final Logger LOG = LogUtils
072: .getL7dLogger(RetransmissionQueueImpl.class);
073:
074: private Map<String, List<ResendCandidate>> candidates = new HashMap<String, List<ResendCandidate>>();
075: private Resender resender;
076: private RMManager manager;
077:
078: public RetransmissionQueueImpl(RMManager m) {
079: manager = m;
080: }
081:
082: public RMManager getManager() {
083: return manager;
084: }
085:
086: public void setManager(RMManager m) {
087: manager = m;
088: }
089:
090: public void addUnacknowledged(Message message) {
091: cacheUnacknowledged(message);
092: }
093:
094: /**
095: * @param seq the sequence under consideration
096: * @return the number of unacknowledged messages for that sequence
097: */
098: public synchronized int countUnacknowledged(SourceSequence seq) {
099: List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
100: return sequenceCandidates == null ? 0 : sequenceCandidates
101: .size();
102: }
103:
104: /**
105: * @return true if there are no unacknowledged messages in the queue
106: */
107: public boolean isEmpty() {
108: return 0 == getUnacknowledged().size();
109: }
110:
111: /**
112: * Purge all candidates for the given sequence that have been acknowledged.
113: *
114: * @param seq the sequence object.
115: */
116: public void purgeAcknowledged(SourceSequence seq) {
117: Collection<BigInteger> purged = new ArrayList<BigInteger>();
118: synchronized (this ) {
119: LOG.fine("Start purging resend candidates.");
120: List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
121: if (null != sequenceCandidates) {
122: for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
123: ResendCandidate candidate = sequenceCandidates
124: .get(i);
125: RMProperties properties = RMContextUtils
126: .retrieveRMProperties(candidate
127: .getMessage(), true);
128: SequenceType st = properties.getSequence();
129: BigInteger m = st.getMessageNumber();
130: if (seq.isAcknowledged(m)) {
131: sequenceCandidates.remove(i);
132: candidate.resolved();
133: purged.add(m);
134: }
135: }
136: }
137: LOG.fine("Completed purging resend candidates.");
138: }
139: if (purged.size() > 0) {
140: RMStore store = manager.getStore();
141: if (null != store) {
142: store.removeMessages(seq.getIdentifier(), purged, true);
143: }
144: }
145: }
146:
147: /**
148: * Initiate resends.
149: */
150: public void start() {
151: if (null != resender) {
152: return;
153: }
154: LOG.fine("Starting retransmission queue");
155:
156: // setup resender
157:
158: resender = getDefaultResender();
159: }
160:
161: /**
162: * Stops resending messages for the specified source sequence.
163: */
164: public void stop(SourceSequence seq) {
165: synchronized (this ) {
166: List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
167: if (null != sequenceCandidates) {
168: for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
169: ResendCandidate candidate = sequenceCandidates
170: .get(i);
171: candidate.cancel();
172: }
173: LOG.log(Level.FINE,
174: "Cancelled resends for sequence {0}.", seq
175: .getIdentifier().getValue());
176: }
177: }
178: }
179:
180: void stop() {
181:
182: }
183:
184: /**
185: * @return the exponential backoff
186: */
187: protected int getExponentialBackoff() {
188: return DEFAULT_EXPONENTIAL_BACKOFF;
189: }
190:
191: /**
192: * @param message the message context
193: * @return a ResendCandidate
194: */
195: protected ResendCandidate createResendCandidate(Message message) {
196: return new ResendCandidate(message);
197: }
198:
199: /**
200: * Accepts a new resend candidate.
201: *
202: * @param ctx the message context.
203: * @return ResendCandidate
204: */
205: protected ResendCandidate cacheUnacknowledged(Message message) {
206: RMProperties rmps = RMContextUtils.retrieveRMProperties(
207: message, true);
208: SequenceType st = rmps.getSequence();
209: Identifier sid = st.getIdentifier();
210: String key = sid.getValue();
211:
212: ResendCandidate candidate = null;
213:
214: synchronized (this ) {
215: List<ResendCandidate> sequenceCandidates = getSequenceCandidates(key);
216: if (null == sequenceCandidates) {
217: sequenceCandidates = new ArrayList<ResendCandidate>();
218: candidates.put(key, sequenceCandidates);
219: }
220: candidate = new ResendCandidate(message);
221: sequenceCandidates.add(candidate);
222: }
223: LOG.fine("Cached unacknowledged message.");
224: return candidate;
225: }
226:
227: /**
228: * @return a map relating sequence ID to a lists of un-acknowledged messages
229: * for that sequence
230: */
231: protected Map<String, List<ResendCandidate>> getUnacknowledged() {
232: return candidates;
233: }
234:
235: /**
236: * @param seq the sequence under consideration
237: * @return the list of resend candidates for that sequence
238: * @pre called with mutex held
239: */
240: protected List<ResendCandidate> getSequenceCandidates(
241: SourceSequence seq) {
242: return getSequenceCandidates(seq.getIdentifier().getValue());
243: }
244:
245: /**
246: * @param key the sequence identifier under consideration
247: * @return the list of resend candidates for that sequence
248: * @pre called with mutex held
249: */
250: protected List<ResendCandidate> getSequenceCandidates(String key) {
251: return candidates.get(key);
252: }
253:
254: private void clientResend(Message message) {
255: Conduit c = message.getExchange().getConduit(message);
256: resend(c, message);
257: }
258:
259: private void serverResend(Message message) {
260:
261: // get the message's to address
262:
263: AddressingProperties maps = RMContextUtils.retrieveMAPs(
264: message, false, true);
265: AttributedURIType to = null;
266: if (null != maps) {
267: to = maps.getTo();
268: }
269: if (null == to) {
270: LOG.log(Level.SEVERE, "NO_ADDRESS_FOR_RESEND_MSG");
271: return;
272: }
273:
274: final String address = to.getValue();
275: LOG.fine("Resending to address: " + address);
276:
277: final Endpoint reliableEndpoint = manager.getReliableEndpoint(
278: message).getEndpoint();
279:
280: ConduitSelector cs = new DeferredConduitSelector() {
281: @Override
282: public synchronized Conduit selectConduit(Message message) {
283: Conduit conduit = null;
284: EndpointInfo endpointInfo = reliableEndpoint
285: .getEndpointInfo();
286: org.apache.cxf.ws.addressing.EndpointReferenceType original = endpointInfo
287: .getTarget();
288: try {
289: if (null != address) {
290: endpointInfo.setAddress(address);
291: }
292: conduit = super .selectConduit(message);
293: } finally {
294: endpointInfo.setAddress(original);
295: }
296: return conduit;
297: }
298: };
299:
300: cs.setEndpoint(reliableEndpoint);
301: Conduit c = cs.selectConduit(message);
302: // REVISIT
303: // use application endpoint message observer instead?
304: c.setMessageObserver(new MessageObserver() {
305: public void onMessage(Message message) {
306: LOG.fine("Ignoring response to resent message.");
307: }
308:
309: });
310: resend(c, message);
311: }
312:
313: private void resend(Conduit c, Message message) {
314: try {
315:
316: // get registered callbacks, create new output stream and
317: // re-register
318: // all callbacks except the retransmission callback
319:
320: OutputStream os = message.getContent(OutputStream.class);
321: List<CachedOutputStreamCallback> callbacks = null;
322:
323: if (os instanceof CachedOutputStream) {
324: callbacks = ((CachedOutputStream) os).getCallbacks();
325: }
326:
327: c.prepare(message);
328:
329: os = message.getContent(OutputStream.class);
330: if (null != callbacks && callbacks.size() > 1) {
331: if (!(os instanceof CachedOutputStream)) {
332: os = RMUtils.createCachedStream(message, os);
333: }
334: for (CachedOutputStreamCallback cb : callbacks) {
335: if (!(cb instanceof RetransmissionCallback)) {
336: ((CachedOutputStream) os).registerCallback(cb);
337: }
338: }
339: }
340: ByteArrayOutputStream savedOutputStream = (ByteArrayOutputStream) message
341: .get(RMMessageConstants.SAVED_OUTPUT_STREAM);
342: byte[] content = null;
343: if (null == savedOutputStream) {
344: content = message.getContent(byte[].class);
345: LOG.fine("Using saved byte array: " + content);
346: } else {
347: content = savedOutputStream.toByteArray();
348: LOG.fine("Using saved output stream: "
349: + savedOutputStream);
350: }
351: ByteArrayInputStream bis = new ByteArrayInputStream(content);
352:
353: // copy saved output stream to new output stream in chunks of 1024
354: CachedOutputStream.copyStream(bis, os, 1024);
355: os.flush();
356: os.close();
357: } catch (IOException ex) {
358: LOG.log(Level.SEVERE, "RESEND_FAILED_MSG", ex);
359: }
360: }
361:
362: /**
363: * Represents a candidate for resend, i.e. an unacked outgoing message.
364: */
365: protected class ResendCandidate implements Runnable {
366: private Message message;
367: private Date next;
368: private TimerTask nextTask;
369: private int resends;
370: private long nextInterval;
371: private long backoff;
372: private boolean pending;
373: private boolean includeAckRequested;
374:
375: /**
376: * @param ctx message context for the unacked message
377: */
378: protected ResendCandidate(Message m) {
379: message = m;
380: resends = 0;
381: RMAssertion rma = PolicyUtils.getRMAssertion(manager
382: .getRMAssertion(), message);
383: long baseRetransmissionInterval = rma
384: .getBaseRetransmissionInterval().getMilliseconds()
385: .longValue();
386: backoff = null != rma.getExponentialBackoff() ? RetransmissionQueue.DEFAULT_EXPONENTIAL_BACKOFF
387: : 1;
388: next = new Date(System.currentTimeMillis()
389: + baseRetransmissionInterval);
390: nextInterval = baseRetransmissionInterval * backoff;
391: if (null != manager.getTimer()) {
392: schedule();
393: }
394: }
395:
396: /**
397: * Initiate resend asynchronsly.
398: *
399: * @param requestAcknowledge true if a AckRequest header is to be sent
400: * with resend
401: */
402: protected void initiate(boolean requestAcknowledge) {
403: includeAckRequested = requestAcknowledge;
404: pending = true;
405: Endpoint ep = message.getExchange().get(Endpoint.class);
406: Executor executor = ep.getExecutor();
407: if (null == executor) {
408: executor = ep.getService().getExecutor();
409: LOG.log(Level.FINE, "Using service executor {0}",
410: executor.getClass().getName());
411: } else {
412: LOG.log(Level.FINE, "Using endpoint executor {0}",
413: executor.getClass().getName());
414: }
415:
416: try {
417: executor.execute(this );
418: } catch (RejectedExecutionException ex) {
419: LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG",
420: ex);
421: }
422: }
423:
424: public void run() {
425: try {
426: // ensure ACK wasn't received while this task was enqueued
427: // on executor
428: if (isPending()) {
429: resender.resend(message, includeAckRequested);
430: includeAckRequested = false;
431: }
432: } finally {
433: attempted();
434: }
435: }
436:
437: /**
438: * @return number of resend attempts
439: */
440: protected int getResends() {
441: return resends;
442: }
443:
444: /**
445: * @return date of next resend
446: */
447: protected Date getNext() {
448: return next;
449: }
450:
451: /**
452: * @return if resend attempt is pending
453: */
454: protected synchronized boolean isPending() {
455: return pending;
456: }
457:
458: /**
459: * ACK has been received for this candidate.
460: */
461: protected synchronized void resolved() {
462: pending = false;
463: next = null;
464: if (null != nextTask) {
465: nextTask.cancel();
466: }
467: }
468:
469: /**
470: * Cancel further resend (although no ACK has been received).
471: */
472: protected void cancel() {
473: if (null != nextTask) {
474: nextTask.cancel();
475: }
476: }
477:
478: /**
479: * @return associated message context
480: */
481: protected Message getMessage() {
482: return message;
483: }
484:
485: /**
486: * A resend has been attempted. Schedule the next attempt.
487: */
488: protected synchronized void attempted() {
489: pending = false;
490: resends++;
491: if (null != next) {
492: next = new Date(next.getTime() + nextInterval);
493: nextInterval *= backoff;
494: schedule();
495: }
496: }
497:
498: protected final synchronized void schedule() {
499: if (null == manager.getTimer()) {
500: return;
501: }
502: class ResendTask extends TimerTask {
503: ResendCandidate candidate;
504:
505: ResendTask(ResendCandidate c) {
506: candidate = c;
507: }
508:
509: @Override
510: public void run() {
511: if (!candidate.isPending()) {
512: candidate.initiate(includeAckRequested);
513: }
514: }
515: }
516: nextTask = new ResendTask(this );
517: try {
518: manager.getTimer().schedule(nextTask, next);
519: } catch (IllegalStateException ex) {
520: LOG
521: .log(Level.WARNING,
522: "SCHEDULE_RESEND_FAILED_MSG", ex);
523: }
524: }
525: }
526:
527: /**
528: * Encapsulates actual resend logic (pluggable to facilitate unit testing)
529: */
530: public interface Resender {
531: /**
532: * Resend mechanics.
533: *
534: * @param context the cloned message context.
535: * @param if a AckRequest should be included
536: */
537: void resend(Message message, boolean requestAcknowledge);
538: }
539:
540: /**
541: * Create default Resender logic.
542: *
543: * @return default Resender
544: */
545: protected final Resender getDefaultResender() {
546: return new Resender() {
547: public void resend(Message message,
548: boolean requestAcknowledge) {
549: RMProperties properties = RMContextUtils
550: .retrieveRMProperties(message, true);
551: SequenceType st = properties.getSequence();
552: if (st != null) {
553: LOG.log(Level.INFO, "RESEND_MSG", st
554: .getMessageNumber());
555: }
556: try {
557: // TODO: remove previously added acknowledgments and update
558: // message id (to avoid duplicates)
559:
560: if (MessageUtils.isRequestor(message)) {
561: clientResend(message);
562: } else {
563: serverResend(message);
564: }
565: } catch (Exception e) {
566: LOG.log(Level.WARNING, "RESEND_FAILED_MSG", e);
567: }
568: }
569: };
570: }
571:
572: /**
573: * Plug in replacement resend logic (facilitates unit testing).
574: *
575: * @param replacement resend logic
576: */
577: protected void replaceResender(Resender replacement) {
578: resender = replacement;
579: }
580:
581: @SuppressWarnings("unchecked")
582: protected JaxbAssertion<RMAssertion> getAssertion(AssertionInfo ai) {
583: return (JaxbAssertion<RMAssertion>) ai.getAssertion();
584: }
585:
586: }
|