001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm;
019:
020: import java.math.BigInteger;
021: import java.util.Date;
022: import java.util.logging.Level;
023: import java.util.logging.Logger;
024:
025: import javax.xml.datatype.Duration;
026:
027: import org.apache.cxf.common.logging.LogUtils;
028: import org.apache.cxf.jaxb.DatatypeFactory;
029: import org.apache.cxf.ws.addressing.ContextUtils;
030: import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
031: import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
032:
033: public class SourceSequence extends AbstractSequence {
034:
035: private static final Logger LOG = LogUtils
036: .getL7dLogger(SourceSequence.class);
037:
038: private Date expires;
039: private Source source;
040: private BigInteger currentMessageNumber;
041: private boolean lastMessage;
042: private Identifier offeringId;
043: private org.apache.cxf.ws.addressing.EndpointReferenceType target;
044:
045: public SourceSequence(Identifier i) {
046: this (i, null, null);
047: }
048:
049: public SourceSequence(Identifier i, Date e, Identifier oi) {
050: this (i, e, oi, BigInteger.ZERO, false);
051: }
052:
053: public SourceSequence(Identifier i, Date e, Identifier oi,
054: BigInteger cmn, boolean lm) {
055: super (i);
056: expires = e;
057:
058: offeringId = oi;
059:
060: currentMessageNumber = cmn;
061: lastMessage = lm;
062: acknowledgement = RMUtils.getWSRMFactory()
063: .createSequenceAcknowledgement();
064: acknowledgement.setIdentifier(id);
065: }
066:
067: /**
068: * @return the message number assigned to the most recent outgoing
069: * application message.
070: */
071: public BigInteger getCurrentMessageNr() {
072: return currentMessageNumber;
073: }
074:
075: /**
076: * @return true if the last message had been sent for this sequence.
077: */
078: public boolean isLastMessage() {
079: return lastMessage;
080: }
081:
082: /**
083: * @return the identifier of the sequence that was created on behalf of the
084: * CreateSequence request that included this sequence as an offer
085: */
086: public Identifier getOfferingSequenceIdentifier() {
087: return offeringId;
088: }
089:
090: /**
091: * @return the identifier of the rm source
092: */
093: public String getEndpointIdentifier() {
094: return source.getName().toString();
095: }
096:
097: /**
098: * @return the expiry data of this sequence
099: */
100: public Date getExpires() {
101: return expires;
102: }
103:
104: /**
105: * Returns true if this sequence was constructed from an offer for an
106: * inbound sequence includes in the CreateSequenceRequest in response to
107: * which the sequence with the specified identifier was created.
108: *
109: * @param id the sequence identifier
110: * @return true if the sequence was constructed from an offer.
111: */
112: public boolean offeredBy(Identifier sid) {
113: return null != offeringId
114: && offeringId.getValue().equals(sid.getValue());
115: }
116:
117: /**
118: * Returns true if a last message had been sent for this sequence and if all
119: * messages for this sequence have been acknowledged.
120: *
121: * @return true if all messages have been acknowledged.
122: */
123: public boolean allAcknowledged() {
124: if (!lastMessage) {
125: return false;
126: }
127:
128: if (acknowledgement.getAcknowledgementRange().size() == 1) {
129: AcknowledgementRange r = acknowledgement
130: .getAcknowledgementRange().get(0);
131: return r.getLower().equals(BigInteger.ONE)
132: && r.getUpper().equals(currentMessageNumber);
133: }
134: return false;
135: }
136:
137: /**
138: * Used by the RM source to cache received acknowledgements for this
139: * sequence.
140: *
141: * @param acknowledgement an acknowledgement for this sequence
142: */
143: public void setAcknowledged(SequenceAcknowledgement a)
144: throws RMException {
145: acknowledgement = a;
146: source.getManager().getRetransmissionQueue().purgeAcknowledged(
147: this );
148: if (allAcknowledged()) {
149: RMEndpoint rme = source.getReliableEndpoint();
150: Proxy proxy = rme.getProxy();
151: proxy.terminate(this );
152: source.removeSequence(this );
153: }
154: }
155:
156: /**
157: * Returns the source associated with this source sequence.
158: *
159: * @return the source.
160: */
161: public Source getSource() {
162: return source;
163: }
164:
165: void setSource(Source s) {
166: source = s;
167: }
168:
169: void setLastMessage(boolean lm) {
170: lastMessage = lm;
171: }
172:
173: /**
174: * Returns true if the sequence is expired.
175: *
176: * @return true if the sequence is expired.
177: */
178:
179: boolean isExpired() {
180: return expires == null ? false : new Date().after(expires);
181:
182: }
183:
184: public void setExpires(Expires ex) {
185: Duration d = null;
186: expires = null;
187: if (null != ex) {
188: d = ex.getValue();
189: }
190:
191: if (null != d && !d.equals(DatatypeFactory.PT0S)) {
192: Date now = new Date();
193: expires = new Date(now.getTime()
194: + ex.getValue().getTimeInMillis(now));
195: }
196: }
197:
198: /**
199: * Returns the next message number and increases the message number.
200: *
201: * @return the next message number.
202: */
203: BigInteger nextMessageNumber() {
204: return nextMessageNumber(null, null, false);
205: }
206:
207: /**
208: * Returns the next message number and increases the message number. The
209: * parameters, if not null, indicate that this message is being sent as a
210: * response to the message with the specified message number in the sequence
211: * specified by the by the identifier, and are used to decide if this
212: * message should be the last in this sequence.
213: *
214: * @return the next message number.
215: */
216: public BigInteger nextMessageNumber(Identifier inSeqId,
217: BigInteger inMsgNumber, boolean last) {
218: assert !lastMessage;
219:
220: BigInteger result = null;
221: synchronized (this ) {
222: currentMessageNumber = currentMessageNumber
223: .add(BigInteger.ONE);
224: if (last) {
225: lastMessage = true;
226: } else {
227: checkLastMessage(inSeqId, inMsgNumber);
228: }
229: result = currentMessageNumber;
230: }
231: return result;
232: }
233:
234: SequenceAcknowledgement getAcknowledgement() {
235: return acknowledgement;
236: }
237:
238: /**
239: * The target for the sequence is the first non-anonymous address that a
240: * message is sent to as part of this sequence. It is subsequently used for
241: * as the target of out-of-band protocol messages related to that sequence
242: * that originate from the sequnce source (i.e. TerminateSequence and
243: * LastMessage, but not AckRequested or SequenceAcknowledgement as these are
244: * orignate from the sequence destination).
245: *
246: * @param to
247: */
248: synchronized void setTarget(
249: org.apache.cxf.ws.addressing.EndpointReferenceType to) {
250: if (target == null && !ContextUtils.isGenericAddress(to)) {
251: target = to;
252: }
253: }
254:
255: synchronized org.apache.cxf.ws.addressing.EndpointReferenceType getTarget() {
256: return target;
257: }
258:
259: /**
260: * Checks if the current message should be the last message in this sequence
261: * and if so sets the lastMessageNumber property.
262: */
263: private void checkLastMessage(Identifier inSeqId,
264: BigInteger inMsgNumber) {
265:
266: // check if this is a response to a message that was is the last message
267: // in the sequence
268: // that included this sequence as an offer
269:
270: if (null != inSeqId && null != inMsgNumber) {
271: Destination destination = source.getReliableEndpoint()
272: .getDestination();
273: DestinationSequence inSeq = null;
274: if (null != destination) {
275: inSeq = destination.getSequence(inSeqId);
276: }
277:
278: if (null != inSeq && offeredBy(inSeqId)
279: && inMsgNumber.equals(inSeq.getLastMessageNumber())) {
280: lastMessage = true;
281: }
282: }
283:
284: if (!lastMessage) {
285: SequenceTerminationPolicyType stp = source.getManager()
286: .getSourcePolicy().getSequenceTerminationPolicy();
287:
288: assert null != stp;
289:
290: if ((!stp.getMaxLength().equals(BigInteger.ZERO) && stp
291: .getMaxLength().compareTo(currentMessageNumber) <= 0)
292: || (stp.getMaxRanges() > 0 && acknowledgement
293: .getAcknowledgementRange().size() >= stp
294: .getMaxRanges())
295: || (stp.getMaxUnacknowledged() > 0 && source
296: .getManager().getRetransmissionQueue()
297: .countUnacknowledged(this ) >= stp
298: .getMaxUnacknowledged())) {
299: lastMessage = true;
300: }
301: }
302:
303: if (LOG.isLoggable(Level.FINE) && lastMessage) {
304: LOG.fine(currentMessageNumber
305: + " should be the last message in this sequence.");
306: }
307: }
308: }
|