001: package org.objectweb.celtix.bus.ws.rm;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005: import java.math.BigInteger;
006: import java.util.ArrayList;
007: import java.util.List;
008: import java.util.TimerTask;
009: import java.util.logging.Level;
010: import java.util.logging.Logger;
011:
012: import org.objectweb.celtix.bus.configuration.wsrm.AcksPolicyType;
013: import org.objectweb.celtix.common.i18n.Message;
014: import org.objectweb.celtix.common.logging.LogUtils;
015: import org.objectweb.celtix.ws.addressing.v200408.EndpointReferenceType;
016: import org.objectweb.celtix.ws.rm.Identifier;
017: import org.objectweb.celtix.ws.rm.SequenceAcknowledgement;
018: import org.objectweb.celtix.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
019: import org.objectweb.celtix.ws.rm.SequenceFaultType;
020: import org.objectweb.celtix.ws.rm.persistence.RMDestinationSequence;
021: import org.objectweb.celtix.ws.rm.policy.RMAssertionType;
022: import org.objectweb.celtix.ws.rm.wsdl.SequenceFault;
023:
024: public class DestinationSequence extends AbstractSequenceImpl implements
025: RMDestinationSequence {
026:
027: private static final Logger LOG = LogUtils
028: .getL7dLogger(DestinationSequence.class);
029:
030: private SequenceAcknowledgement acked;
031:
032: private RMDestination destination;
033: private EndpointReferenceType acksTo;
034: private BigInteger lastMessageNumber;
035: private SequenceMonitor monitor;
036: private boolean acknowledgeOnNextOccasion;
037: private List<DeferredAcknowledgment> deferredAcknowledgments;
038: private String correlationID;
039:
040: public DestinationSequence(Identifier i, EndpointReferenceType a,
041: RMDestination d) {
042: this (i, a, null, null);
043: setDestination(d);
044: }
045:
046: public DestinationSequence(Identifier i, EndpointReferenceType a,
047: BigInteger lmn, SequenceAcknowledgement ac) {
048: super (i);
049: acksTo = a;
050: lastMessageNumber = lmn;
051: acked = ac;
052: if (null == acked) {
053: acked = RMUtils.getWSRMFactory()
054: .createSequenceAcknowledgement();
055: acked.setIdentifier(id);
056: }
057: monitor = new SequenceMonitor();
058: }
059:
060: // RMDestinationSequence interface
061:
062: /**
063: * @return the acksTo address for the sequence
064: */
065: public EndpointReferenceType getAcksTo() {
066: return acksTo;
067: }
068:
069: /**
070: * @return the message number of the last message or null if the last message had not been received.
071: */
072: public BigInteger getLastMessageNr() {
073: return lastMessageNumber;
074: }
075:
076: /**
077: * @return the sequence acknowledgement presenting the sequences thus far received by a destination
078: */
079: public SequenceAcknowledgement getAcknowledgment() {
080: return acked;
081: }
082:
083: /**
084: * @return the sequence acknowledgement presenting the sequences thus far received by a destination
085: * as an input stream
086: */
087: public InputStream getAcknowledgmentAsStream() {
088: return RMUtils.getPersistenceUtils()
089: .getAcknowledgementAsInputStream(acked);
090: }
091:
092: /**
093: * @return the identifier of the rm destination
094: */
095: public String getEndpointIdentifier() {
096: if (null != destination) {
097: return destination.getEndpointId();
098: }
099: return null;
100: }
101:
102: // end RMDestinationSequence interface
103:
104: final void setDestination(RMDestination d) {
105: destination = d;
106: }
107:
108: RMDestination getDestination() {
109: return destination;
110: }
111:
112: void setLastMessageNumber(BigInteger lmn) {
113: lastMessageNumber = lmn;
114: }
115:
116: /**
117: * Returns the monitor for this sequence.
118: *
119: * @return the sequence monitor.
120: */
121: SequenceMonitor getMonitor() {
122: return monitor;
123: }
124:
125: /**
126: * Called by the RM destination upon receipt of a message with the given
127: * message number for this sequence.
128: *
129: * @param messageNumber the number of the received message
130: * @param lastMessage true if this is to be the last message in the sequence
131: */
132: void acknowledge(BigInteger messageNumber) throws SequenceFault {
133:
134: if (null != lastMessageNumber
135: && messageNumber.compareTo(lastMessageNumber) > 0) {
136: SequenceFaultType sf = RMUtils.getWSRMFactory()
137: .createSequenceFaultType();
138: sf.setFaultCode(RMUtils.getRMConstants()
139: .getLastMessageNumberExceededFaultCode());
140: Message msg = new Message(
141: "LAST_MESSAGE_NUMBER_EXCEEDED_EXC", LOG, this );
142: throw new SequenceFault(msg.toString(), sf);
143: }
144:
145: monitor.acknowledgeMessage();
146:
147: boolean done = false;
148: int i = 0;
149: for (; i < acked.getAcknowledgementRange().size(); i++) {
150: AcknowledgementRange r = acked.getAcknowledgementRange()
151: .get(i);
152: if (r.getLower().compareTo(messageNumber) <= 0
153: && r.getUpper().compareTo(messageNumber) >= 0) {
154: done = true;
155: break;
156: } else {
157: BigInteger diff = r.getLower().subtract(messageNumber);
158: if (diff.signum() == 1) {
159: if (diff.equals(BigInteger.ONE)) {
160: r.setLower(messageNumber);
161: done = true;
162: }
163: break;
164: } else if (messageNumber.subtract(r.getUpper()).equals(
165: BigInteger.ONE)) {
166: r.setUpper(messageNumber);
167: done = true;
168: break;
169: }
170: }
171: }
172:
173: if (!done) {
174: AcknowledgementRange range = RMUtils
175: .getWSRMFactory()
176: .createSequenceAcknowledgementAcknowledgementRange();
177: range.setLower(messageNumber);
178: range.setUpper(messageNumber);
179: acked.getAcknowledgementRange().add(i, range);
180: }
181:
182: scheduleAcknowledgement();
183: }
184:
185: /**
186: * Called after an acknowledgement header for this sequence has been added to an outgoing message.
187: */
188: void acknowledgmentSent() {
189: acknowledgeOnNextOccasion = false;
190: }
191:
192: boolean sendAcknowledgement() {
193: return acknowledgeOnNextOccasion;
194: }
195:
196: /**
197: * The correlation of the incoming CreateSequence call used to create this
198: * sequence is recorded so that in the absence of an offer, the corresponding
199: * outgoing CreateSeqeunce can be correlated.
200: */
201: void setCorrelationID(String cid) {
202: correlationID = cid;
203: }
204:
205: String getCorrelationID() {
206: return correlationID;
207: }
208:
209: boolean canPiggybackAckOnPartialResponse() {
210: // TODO: should also check if we allow breaking the WI Profile rule by which no headers
211: // can be included in a HTTP response
212: return getAcksTo().getAddress().getValue().equals(
213: Names.WSA_ANONYMOUS_ADDRESS);
214: }
215:
216: static SequenceFault createUnknownSequenceFault(Identifier sid) {
217: SequenceFaultType sf = RMUtils.getWSRMFactory()
218: .createSequenceFaultType();
219: sf.setFaultCode(RMUtils.getRMConstants()
220: .getUnknownSequenceFaultCode());
221: Message msg = new Message("UNKNOWN_SEQUENCE_EXC", LOG, sid
222: .getValue());
223: return new SequenceFault(msg.toString(), sf);
224: }
225:
226: private void scheduleAcknowledgement() {
227: RMAssertionType rma = destination.getRMAssertion();
228: int delay = 0;
229: if (null != rma.getAcknowledgementInterval()) {
230: delay = rma.getAcknowledgementInterval().getMilliseconds()
231: .intValue();
232: }
233: AcksPolicyType ap = destination.getAcksPolicy();
234: if (delay > 0
235: && getMonitor().getMPM() >= ap
236: .getIntraMessageThreshold()) {
237: scheduleDeferredAcknowledgement(delay);
238: } else {
239: scheduleImmediateAcknowledgement();
240: }
241: }
242:
243: void scheduleImmediateAcknowledgement() {
244: acknowledgeOnNextOccasion = true;
245: }
246:
247: private void scheduleDeferredAcknowledgement(int delay) {
248: if (null == deferredAcknowledgments) {
249: deferredAcknowledgments = new ArrayList<DeferredAcknowledgment>();
250: }
251: long now = System.currentTimeMillis();
252: long expectedExecutionTime = now + delay;
253: for (DeferredAcknowledgment da : deferredAcknowledgments) {
254: if (da.scheduledExecutionTime() <= expectedExecutionTime) {
255: return;
256: }
257: }
258: DeferredAcknowledgment da = new DeferredAcknowledgment();
259: deferredAcknowledgments.add(da);
260: destination.getHandler().getTimer().schedule(da, delay);
261: }
262:
263: final class DeferredAcknowledgment extends TimerTask {
264:
265: public void run() {
266: DestinationSequence.this .scheduleImmediateAcknowledgement();
267: try {
268: destination.getHandler().getProxy().acknowledge(
269: DestinationSequence.this );
270: } catch (IOException ex) {
271: Message msg = new Message("SEQ_ACK_SEND_EXC", LOG,
272: DestinationSequence.this);
273: LOG.log(Level.SEVERE, msg.toString(), ex);
274: }
275: }
276: }
277: }
|