001: package org.objectweb.celtix.bus.ws.rm;
002:
003: import java.math.BigInteger;
004: import java.util.Date;
005: import java.util.logging.Level;
006: import java.util.logging.Logger;
007:
008: import javax.xml.datatype.DatatypeConfigurationException;
009: import javax.xml.datatype.DatatypeFactory;
010: import javax.xml.datatype.Duration;
011:
012: import org.objectweb.celtix.bus.configuration.wsrm.SequenceTerminationPolicyType;
013: import org.objectweb.celtix.bus.ws.addressing.ContextUtils;
014: import org.objectweb.celtix.common.logging.LogUtils;
015: import org.objectweb.celtix.ws.rm.Expires;
016: import org.objectweb.celtix.ws.rm.Identifier;
017: import org.objectweb.celtix.ws.rm.SequenceAcknowledgement;
018: import org.objectweb.celtix.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
019: import org.objectweb.celtix.ws.rm.persistence.RMSourceSequence;
020:
021: public class SourceSequence extends AbstractSequenceImpl implements
022: RMSourceSequence {
023:
024: public static final Duration PT0S;
025: private static final Logger LOG = LogUtils
026: .getL7dLogger(SourceSequence.class);
027:
028: private SequenceAcknowledgement acked;
029:
030: private Date expires;
031: private RMSource source;
032: private BigInteger currentMessageNumber;
033: private boolean lastMessage;
034: private Identifier offeringId;
035: private org.objectweb.celtix.ws.addressing.EndpointReferenceType target;
036:
037: static {
038: Duration pt0s = null;
039: try {
040: DatatypeFactory df = DatatypeFactory.newInstance();
041: pt0s = df.newDuration("PT0S");
042: } catch (DatatypeConfigurationException ex) {
043: LOG
044: .log(Level.INFO,
045: "Could not create Duration object.", ex);
046: }
047: PT0S = pt0s;
048: }
049:
050: public SourceSequence(Identifier i) {
051: this (i, null, null);
052: }
053:
054: public SourceSequence(Identifier i, Date e, Identifier oi) {
055: this (i, e, oi, BigInteger.ZERO, false);
056: }
057:
058: public SourceSequence(Identifier i, Date e, Identifier oi,
059: BigInteger cmn, boolean lm) {
060: super (i);
061: expires = e;
062:
063: offeringId = oi;
064:
065: currentMessageNumber = cmn;
066: lastMessage = lm;
067: acked = RMUtils.getWSRMFactory()
068: .createSequenceAcknowledgement();
069: acked.setIdentifier(id);
070: }
071:
072: // begin RMSourceSequence interface
073:
074: public BigInteger getCurrentMessageNr() {
075: return currentMessageNumber;
076: }
077:
078: /**
079: * @return the identifier of the rm source
080: */
081: public String getEndpointIdentifier() {
082: if (null != source) {
083: return source.getEndpointId();
084: }
085: return null;
086: }
087:
088: public Date getExpiry() {
089: return expires;
090: }
091:
092: public boolean isLastMessage() {
093: return lastMessage;
094: }
095:
096: public Identifier getOfferingSequenceIdentifier() {
097: return offeringId;
098: }
099:
100: // end RMSourceSequence interface
101:
102: void setSource(RMSource s) {
103: source = s;
104: }
105:
106: void setLastMessage(boolean lm) {
107: lastMessage = lm;
108: }
109:
110: /**
111: * Returns true if this sequence was constructed from an offer for an inbound sequence
112: * includes in the CreateSequenceRequest in response to which the sequence with
113: * the specified identifier was created.
114: *
115: * @param id the sequence identifier
116: * @return true if the sequence was constructed from an offer.
117: */
118: boolean offeredBy(Identifier sid) {
119: return null != offeringId
120: && offeringId.getValue().equals(sid.getValue());
121: }
122:
123: /**
124: * Returns true if the sequence is expired.
125: *
126: * @return true if the sequence is expired.
127: */
128:
129: boolean isExpired() {
130: return expires == null ? false : new Date().after(expires);
131: }
132:
133: void setExpires(Expires ex) {
134: Duration d = null;
135: if (null != ex) {
136: d = ex.getValue();
137: }
138:
139: if (null != d && (null == PT0S || !PT0S.equals(d))) {
140: Date now = new Date();
141: expires = new Date(now.getTime()
142: + ex.getValue().getTimeInMillis(now));
143: }
144: }
145:
146: /**
147: * Returns the next message number and increases the message number.
148: *
149: * @return the next message number.
150: */
151: BigInteger nextMessageNumber() {
152: return nextMessageNumber(null, null);
153: }
154:
155: /**
156: * Returns the next message number and increases the message number.
157: * The parameters, if not null, indicate that this message is being sent as a response
158: * to the message with the specified message number in the sequence specified by the
159: * by the identifier, and are used to decide if this message should be the last in
160: * this sequence.
161: *
162: * @return the next message number.
163: */
164: BigInteger nextMessageNumber(Identifier inSeqId,
165: BigInteger inMsgNumber) {
166:
167: assert !lastMessage;
168:
169: BigInteger result = null;
170: synchronized (this ) {
171: currentMessageNumber = currentMessageNumber
172: .add(BigInteger.ONE);
173: checkLastMessage(inSeqId, inMsgNumber);
174: result = currentMessageNumber;
175: }
176: return result;
177: }
178:
179: void nextAndLastMessageNumber() {
180: assert !lastMessage;
181:
182: synchronized (this ) {
183: currentMessageNumber = currentMessageNumber
184: .add(BigInteger.ONE);
185: lastMessage = true;
186: }
187: }
188:
189: /**
190: * Used by the RM source to cache received acknowledgements for this
191: * sequence.
192: *
193: * @param acknowledgement an acknowledgement for this sequence
194: */
195: void setAcknowledged(SequenceAcknowledgement acknowledgement) {
196: acked = acknowledgement;
197: }
198:
199: SequenceAcknowledgement getAcknowledgement() {
200: return acked;
201: }
202:
203: /**
204: * Checks if the message with the given number has been acknowledged.
205: *
206: * @param m the message number
207: * @return true of the message with the given number has been acknowledged.
208: */
209: boolean isAcknowledged(BigInteger m) {
210: for (AcknowledgementRange r : acked.getAcknowledgementRange()) {
211: if (m.subtract(r.getLower()).signum() >= 0
212: && r.getUpper().subtract(m).signum() >= 0) {
213: return true;
214: }
215: }
216: return false;
217: }
218:
219: /**
220: * Returns true if a last message had been sent for this sequence and if all
221: * messages for this sequence have been acknowledged.
222: *
223: * @return true if all messages have been acknowledged.
224: */
225: boolean allAcknowledged() {
226: if (!lastMessage) {
227: return false;
228: }
229:
230: if (acked.getAcknowledgementRange().size() == 1) {
231: AcknowledgementRange r = acked.getAcknowledgementRange()
232: .get(0);
233: return r.getLower().equals(BigInteger.ONE)
234: && r.getUpper().equals(currentMessageNumber);
235: }
236: return false;
237: }
238:
239: /**
240: * The target for the sequence is the first non-anonymous address that
241: * a message is sent to as part of this sequence. It is subsequently used
242: * for as the target of out-of-band protocol messages related to that
243: * sequence that originate from the sequnce source (i.e. TerminateSequence
244: * and LastMessage, but not AckRequested or SequenceAcknowledgement as these
245: * are orignate from the sequence destination).
246: *
247: * @param to
248: */
249: synchronized void setTarget(
250: org.objectweb.celtix.ws.addressing.EndpointReferenceType to) {
251: if (target == null && !ContextUtils.isGenericAddress(to)) {
252: target = to;
253: }
254: }
255:
256: synchronized org.objectweb.celtix.ws.addressing.EndpointReferenceType getTarget() {
257: return target;
258: }
259:
260: /**
261: * Checks if the current message should be the last message in this sequence
262: * and if so sets the lastMessageNumber property.
263: */
264: private void checkLastMessage(Identifier inSeqId,
265: BigInteger inMsgNumber) {
266:
267: assert null != source;
268:
269: // check if this is a response to a message that was is the last message in the sequence
270: // that included this sequence as an offer
271:
272: if (null != inSeqId && null != inMsgNumber) {
273: DestinationSequence inSeq = source.getHandler()
274: .getDestination().getSequence(inSeqId);
275: if (null != inSeq && offeredBy(inSeqId)
276: && inMsgNumber.equals(inSeq.getLastMessageNr())) {
277: lastMessage = true;
278: }
279: }
280:
281: if (!lastMessage) {
282: SequenceTerminationPolicyType stp = source
283: .getSequenceTerminationPolicy();
284: assert null != stp;
285:
286: if ((!stp.getMaxLength().equals(BigInteger.ZERO) && stp
287: .getMaxLength().compareTo(currentMessageNumber) <= 0)
288: || (stp.getMaxRanges() > 0 && acked
289: .getAcknowledgementRange().size() >= stp
290: .getMaxRanges())
291: || (stp.getMaxUnacknowledged() > 0 && source
292: .getRetransmissionQueue()
293: .countUnacknowledged(this ) >= stp
294: .getMaxUnacknowledged())) {
295: lastMessage = true;
296: }
297: }
298:
299: if (LOG.isLoggable(Level.FINE) && lastMessage) {
300: LOG.fine(currentMessageNumber
301: + " should be the last message in this sequence.");
302: }
303: }
304:
305: }
|