001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm;
019:
020: import java.math.BigInteger;
021: import java.util.ArrayList;
022: import java.util.Collection;
023: import java.util.List;
024: import java.util.TimerTask;
025: import java.util.logging.Level;
026: import java.util.logging.Logger;
027:
028: import org.apache.cxf.common.logging.LogUtils;
029: import org.apache.cxf.message.Message;
030: import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
031: import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
032: import org.apache.cxf.ws.rm.manager.AcksPolicyType;
033: import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
034: import org.apache.cxf.ws.rm.persistence.RMStore;
035: import org.apache.cxf.ws.rm.policy.PolicyUtils;
036: import org.apache.cxf.ws.rm.policy.RMAssertion;
037: import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
038: import org.apache.cxf.ws.rm.policy.RMAssertion.InactivityTimeout;
039:
040: public class DestinationSequence extends AbstractSequence {
041:
042: private static final Logger LOG = LogUtils
043: .getL7dLogger(DestinationSequence.class);
044:
045: private Destination destination;
046: private EndpointReferenceType acksTo;
047: private BigInteger lastMessageNumber;
048: private SequenceMonitor monitor;
049: private boolean acknowledgeOnNextOccasion;
050: private List<DeferredAcknowledgment> deferredAcknowledgments;
051: private SequenceTermination scheduledTermination;
052: private String correlationID;
053:
054: public DestinationSequence(Identifier i, EndpointReferenceType a,
055: Destination d) {
056: this (i, a, null, null);
057: destination = d;
058: }
059:
060: public DestinationSequence(Identifier i, EndpointReferenceType a,
061: BigInteger lmn, SequenceAcknowledgement ac) {
062: super (i);
063: acksTo = a;
064: lastMessageNumber = lmn;
065: acknowledgement = ac;
066: if (null == acknowledgement) {
067: acknowledgement = RMUtils.getWSRMFactory()
068: .createSequenceAcknowledgement();
069: acknowledgement.setIdentifier(id);
070: }
071: monitor = new SequenceMonitor();
072: }
073:
074: /**
075: * @return the acksTo address for the sequence
076: */
077: public EndpointReferenceType getAcksTo() {
078: return acksTo;
079: }
080:
081: /**
082: * @return the message number of the last message or null if the last message had not been received.
083: */
084: public BigInteger getLastMessageNumber() {
085: return lastMessageNumber;
086: }
087:
088: /**
089: * @return the sequence acknowledgement presenting the sequences thus far received by a destination
090: */
091: public SequenceAcknowledgement getAcknowledgment() {
092: return acknowledgement;
093: }
094:
095: /**
096: * @return the identifier of the rm destination
097: */
098: public String getEndpointIdentifier() {
099: return destination.getName();
100: }
101:
102: public void acknowledge(Message message) throws SequenceFault {
103: SequenceType st = RMContextUtils.retrieveRMProperties(message,
104: false).getSequence();
105: BigInteger messageNumber = st.getMessageNumber();
106: LOG.fine("Acknowledging message: " + messageNumber);
107: if (null != lastMessageNumber
108: && messageNumber.compareTo(lastMessageNumber) > 0) {
109: throw new SequenceFaultFactory()
110: .createLastMessageNumberExceededFault(st
111: .getIdentifier());
112: }
113:
114: monitor.acknowledgeMessage();
115:
116: synchronized (this ) {
117: boolean done = false;
118: int i = 0;
119: for (; i < acknowledgement.getAcknowledgementRange().size(); i++) {
120: AcknowledgementRange r = acknowledgement
121: .getAcknowledgementRange().get(i);
122: if (r.getLower().compareTo(messageNumber) <= 0
123: && r.getUpper().compareTo(messageNumber) >= 0) {
124: done = true;
125: break;
126: } else {
127: BigInteger diff = r.getLower().subtract(
128: messageNumber);
129: if (diff.signum() == 1) {
130: if (diff.equals(BigInteger.ONE)) {
131: r.setLower(messageNumber);
132: done = true;
133: }
134: break;
135: } else if (messageNumber.subtract(r.getUpper())
136: .equals(BigInteger.ONE)) {
137: r.setUpper(messageNumber);
138: done = true;
139: break;
140: }
141: }
142: }
143:
144: if (!done) {
145: AcknowledgementRange range = RMUtils
146: .getWSRMFactory()
147: .createSequenceAcknowledgementAcknowledgementRange();
148: range.setLower(messageNumber);
149: range.setUpper(messageNumber);
150: acknowledgement.getAcknowledgementRange().add(i, range);
151: }
152: mergeRanges();
153: notifyAll();
154: }
155:
156: purgeAcknowledged(messageNumber);
157:
158: RMAssertion rma = PolicyUtils.getRMAssertion(destination
159: .getManager().getRMAssertion(), message);
160: long acknowledgementInterval = 0;
161: AcknowledgementInterval ai = rma.getAcknowledgementInterval();
162: if (null != ai) {
163: BigInteger val = ai.getMilliseconds();
164: if (null != val) {
165: acknowledgementInterval = val.longValue();
166: }
167: }
168:
169: scheduleAcknowledgement(acknowledgementInterval);
170:
171: long inactivityTimeout = 0;
172: InactivityTimeout iat = rma.getInactivityTimeout();
173: if (null != iat) {
174: BigInteger val = iat.getMilliseconds();
175: if (null != val) {
176: inactivityTimeout = val.longValue();
177: }
178: }
179: scheduleSequenceTermination(inactivityTimeout);
180:
181: }
182:
183: void mergeRanges() {
184: List<AcknowledgementRange> ranges = acknowledgement
185: .getAcknowledgementRange();
186: for (int i = ranges.size() - 1; i > 0; i--) {
187: AcknowledgementRange current = ranges.get(i);
188: AcknowledgementRange previous = ranges.get(i - 1);
189: if (current.getLower().subtract(previous.getUpper())
190: .equals(BigInteger.ONE)) {
191: previous.setUpper(current.getUpper());
192: ranges.remove(i);
193: }
194: }
195: }
196:
197: void setDestination(Destination d) {
198: destination = d;
199: }
200:
201: Destination getDestination() {
202: return destination;
203: }
204:
205: /**
206: * Returns the monitor for this sequence.
207: *
208: * @return the sequence monitor.
209: */
210: SequenceMonitor getMonitor() {
211: return monitor;
212: }
213:
214: void setLastMessageNumber(BigInteger lmn) {
215: lastMessageNumber = lmn;
216: }
217:
218: boolean canPiggybackAckOnPartialResponse() {
219: // TODO: should also check if we allow breaking the WI Profile rule by which no headers
220: // can be included in a HTTP response
221: return getAcksTo().getAddress().getValue().equals(
222: RMConstants.getAnonymousAddress());
223: }
224:
225: /**
226: * Ensures that the delivery assurance is honored, e.g. by throwing an
227: * exception if the message had already been delivered and the delivery
228: * assurance is AtMostOnce.
229: * This method blocks in case the delivery assurance is
230: * InOrder and and not all messages with lower message numbers have been
231: * delivered.
232: *
233: * @param s the SequenceType object including identifier and message number
234: * @throws Fault if message had already been acknowledged
235: */
236: void applyDeliveryAssurance(BigInteger mn) throws RMException {
237: DeliveryAssuranceType da = destination.getManager()
238: .getDeliveryAssurance();
239: if (da.isSetAtMostOnce() && isAcknowledged(mn)) {
240: org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
241: "MESSAGE_ALREADY_DELIVERED_EXC", LOG, mn,
242: getIdentifier().getValue());
243: LOG.log(Level.SEVERE, msg.toString());
244: throw new RMException(msg);
245: }
246: if (da.isSetInOrder() && da.isSetAtLeastOnce()) {
247: synchronized (this ) {
248: boolean ok = allPredecessorsAcknowledged(mn);
249: while (!ok) {
250: try {
251: wait();
252: ok = allPredecessorsAcknowledged(mn);
253: } catch (InterruptedException ie) {
254: // ignore
255: }
256: }
257: }
258: }
259: }
260:
261: synchronized boolean allPredecessorsAcknowledged(BigInteger mn) {
262: return acknowledgement.getAcknowledgementRange().size() == 1
263: && acknowledgement.getAcknowledgementRange().get(0)
264: .getLower().equals(BigInteger.ONE)
265: && acknowledgement.getAcknowledgementRange().get(0)
266: .getUpper().subtract(mn).signum() >= 0;
267: }
268:
269: void purgeAcknowledged(BigInteger messageNr) {
270: RMStore store = destination.getManager().getStore();
271: if (null == store) {
272: return;
273: }
274: Collection<BigInteger> messageNrs = new ArrayList<BigInteger>();
275: messageNrs.add(messageNr);
276: store.removeMessages(getIdentifier(), messageNrs, false);
277: }
278:
279: /**
280: * Called after an acknowledgement header for this sequence has been added to an outgoing message.
281: */
282: void acknowledgmentSent() {
283: acknowledgeOnNextOccasion = false;
284: }
285:
286: public boolean sendAcknowledgement() {
287: return acknowledgeOnNextOccasion;
288: }
289:
290: /**
291: * The correlation of the incoming CreateSequence call used to create this
292: * sequence is recorded so that in the absence of an offer, the corresponding
293: * outgoing CreateSeqeunce can be correlated.
294: */
295: void setCorrelationID(String cid) {
296: correlationID = cid;
297: }
298:
299: String getCorrelationID() {
300: return correlationID;
301: }
302:
303: void scheduleAcknowledgement(long acknowledgementInterval) {
304: AcksPolicyType ap = destination.getManager()
305: .getDestinationPolicy().getAcksPolicy();
306:
307: if (acknowledgementInterval > 0
308: && getMonitor().getMPM() >= ap
309: .getIntraMessageThreshold()) {
310: LOG.fine("Schedule deferred acknowledgment");
311: scheduleDeferredAcknowledgement(acknowledgementInterval);
312: } else {
313: LOG.fine("Schedule immediate acknowledgment");
314: scheduleImmediateAcknowledgement();
315: }
316: }
317:
318: void scheduleImmediateAcknowledgement() {
319: acknowledgeOnNextOccasion = true;
320: }
321:
322: synchronized void scheduleSequenceTermination(long inactivityTimeout) {
323: if (inactivityTimeout <= 0) {
324: return;
325: }
326: boolean scheduled = null != scheduledTermination;
327: if (null == scheduledTermination) {
328: scheduledTermination = new SequenceTermination();
329: }
330: scheduledTermination.updateInactivityTimeout(inactivityTimeout);
331: if (!scheduled) {
332: destination.getManager().getTimer().schedule(
333: scheduledTermination, inactivityTimeout);
334: }
335: }
336:
337: synchronized void scheduleDeferredAcknowledgement(long delay) {
338:
339: if (null == deferredAcknowledgments) {
340: deferredAcknowledgments = new ArrayList<DeferredAcknowledgment>();
341: }
342: long now = System.currentTimeMillis();
343: long expectedExecutionTime = now + delay;
344: for (DeferredAcknowledgment da : deferredAcknowledgments) {
345: if (da.scheduledExecutionTime() <= expectedExecutionTime) {
346: return;
347: }
348: }
349: DeferredAcknowledgment da = new DeferredAcknowledgment();
350: deferredAcknowledgments.add(da);
351: destination.getManager().getTimer().schedule(da, delay);
352: LOG.fine("Scheduled acknowledgment to be sent in " + delay
353: + " ms");
354: }
355:
356: synchronized void cancelDeferredAcknowledgments() {
357: if (null == deferredAcknowledgments) {
358: return;
359: }
360: for (int i = deferredAcknowledgments.size() - 1; i >= 0; i--) {
361: DeferredAcknowledgment da = deferredAcknowledgments.get(i);
362: da.cancel();
363: }
364: }
365:
366: synchronized void cancelTermination() {
367: if (null != scheduledTermination) {
368: scheduledTermination.cancel();
369: }
370: }
371:
372: final class DeferredAcknowledgment extends TimerTask {
373:
374: public void run() {
375: LOG.fine("timer task: send acknowledgment.");
376: DestinationSequence.this .scheduleImmediateAcknowledgement();
377:
378: try {
379: RMEndpoint rme = destination.getReliableEndpoint();
380: Proxy proxy = rme.getProxy();
381: proxy.acknowledge(DestinationSequence.this );
382: } catch (RMException ex) {
383: // already logged
384: } finally {
385: synchronized (DestinationSequence.this ) {
386: DestinationSequence.this .deferredAcknowledgments
387: .remove(this );
388: }
389:
390: }
391:
392: }
393: }
394:
395: final class SequenceTermination extends TimerTask {
396:
397: private long maxInactivityTimeout;
398:
399: void updateInactivityTimeout(long timeout) {
400: maxInactivityTimeout = Math.max(maxInactivityTimeout,
401: timeout);
402: }
403:
404: public void run() {
405: synchronized (DestinationSequence.this ) {
406: DestinationSequence.this .scheduledTermination = null;
407: RMEndpoint rme = destination.getReliableEndpoint();
408: long lat = Math.max(rme.getLastControlMessage(), rme
409: .getLastApplicationMessage());
410: if (0 == lat) {
411: return;
412: }
413: long now = System.currentTimeMillis();
414: if (now - lat >= maxInactivityTimeout) {
415:
416: // terminate regardless outstanding acknowledgments - as we assume that the client is
417: // gone there is no point in sending a SequenceAcknowledgment
418:
419: LogUtils.log(LOG, Level.WARNING,
420: "TERMINATING_INACTIVE_SEQ_MSG",
421: DestinationSequence.this .getIdentifier()
422: .getValue());
423: DestinationSequence.this .destination
424: .removeSequence(DestinationSequence.this );
425:
426: } else {
427: // reschedule
428: SequenceTermination st = new SequenceTermination();
429: st.updateInactivityTimeout(maxInactivityTimeout);
430: DestinationSequence.this.destination.getManager()
431: .getTimer().schedule(st,
432: maxInactivityTimeout);
433: }
434: }
435: }
436: }
437:
438: }
|