001: package org.objectweb.celtix.bus.ws.rm;
002:
003: import java.io.IOException;
004: import java.util.logging.Level;
005: import java.util.logging.Logger;
006:
007: import javax.xml.datatype.Duration;
008:
009: import org.objectweb.celtix.bus.configuration.wsrm.DestinationPolicyType;
010: import org.objectweb.celtix.bus.ws.addressing.VersionTransformer;
011: import org.objectweb.celtix.common.logging.LogUtils;
012: import org.objectweb.celtix.ws.addressing.AddressingProperties;
013: import org.objectweb.celtix.ws.addressing.v200408.AttributedURI;
014: import org.objectweb.celtix.ws.rm.AcceptType;
015: import org.objectweb.celtix.ws.rm.CreateSequenceResponseType;
016: import org.objectweb.celtix.ws.rm.CreateSequenceType;
017: import org.objectweb.celtix.ws.rm.Expires;
018: import org.objectweb.celtix.ws.rm.Identifier;
019: import org.objectweb.celtix.ws.rm.OfferType;
020: import org.objectweb.celtix.ws.rm.wsdl.SequenceFault;
021:
022: public class RMServant {
023:
024: private static final Logger LOG = LogUtils
025: .getL7dLogger(RMServant.class);
026:
027: // REVISIT assumption there is only a single outstanding unattached Identifier
028: private Identifier unattachedIdentifier;
029:
030: public RMServant() {
031: }
032:
033: /**
034: * Called on the RM handler upon inbound processing a CreateSequence request.
035: *
036: * @param
037: * @return the CreateSequenceResponse.
038: */
039: public CreateSequenceResponseType createSequence(
040: RMDestination destination, CreateSequenceType cs,
041: AddressingProperties maps) throws SequenceFault {
042:
043: CreateSequenceResponseType csr = RMUtils.getWSRMFactory()
044: .createCreateSequenceResponseType();
045: csr.setIdentifier(destination.generateSequenceIdentifier());
046:
047: DestinationPolicyType dp = destination.getDestinationPolicies();
048: Duration supportedDuration = dp.getSequenceExpiration();
049: if (null == supportedDuration) {
050: supportedDuration = SourceSequence.PT0S;
051: }
052: Expires ex = cs.getExpires();
053: LOG.info("supported duration: " + supportedDuration);
054:
055: if (null != ex
056: || supportedDuration.isShorterThan(SourceSequence.PT0S)) {
057: Duration effectiveDuration = supportedDuration;
058: if (null != ex
059: && supportedDuration.isLongerThan(ex.getValue())) {
060: effectiveDuration = supportedDuration;
061: }
062: ex = RMUtils.getWSRMFactory().createExpires();
063: ex.setValue(effectiveDuration);
064: csr.setExpires(ex);
065: }
066:
067: OfferType offer = cs.getOffer();
068: if (null != offer) {
069: AcceptType accept = RMUtils.getWSRMFactory()
070: .createAcceptType();
071: if (dp.isAcceptOffers()) {
072: RMSource source = destination.getHandler().getSource();
073: LOG.fine("Accepting inbound sequence offer");
074: AttributedURI to = VersionTransformer.convert(maps
075: .getTo());
076: accept
077: .setAcksTo(RMUtils.createReference(to
078: .getValue()));
079: SourceSequence seq = new SourceSequence(offer
080: .getIdentifier(), null, csr.getIdentifier());
081: seq.setExpires(offer.getExpires());
082: seq.setTarget(VersionTransformer
083: .convert(cs.getAcksTo()));
084: source.addSequence(seq);
085: source.setCurrent(csr.getIdentifier(), seq);
086: LOG
087: .fine("Making offered sequence the current sequence for responses to "
088: + csr.getIdentifier().getValue());
089: } else {
090: LOG.fine("Refusing inbound sequence offer");
091: accept.setAcksTo(RMUtils
092: .createReference(Names.WSA_NONE_ADDRESS));
093: }
094: csr.setAccept(accept);
095: }
096:
097: DestinationSequence seq = new DestinationSequence(csr
098: .getIdentifier(), cs.getAcksTo(), destination);
099: seq.setCorrelationID(maps.getMessageID().getValue());
100: destination.addSequence(seq);
101:
102: return csr;
103: }
104:
105: public void createSequenceResponse(RMSource source,
106: CreateSequenceResponseType csr, Identifier offeredId) {
107: // moved from RMProxy.createSequence
108: // csr.getIdentifier() is the Identifier for the newly created sequence
109: SourceSequence seq = new SourceSequence(csr.getIdentifier());
110: seq.setExpires(csr.getExpires());
111: source.addSequence(seq);
112:
113: // the incoming sequence ID is either used as the requestor sequence
114: // (signalled by null) or associated with a corresponding sequence
115: // identifier
116: source.setCurrent(clearUnattachedIdentifier(), seq);
117:
118: // if a sequence was offered and accepted, then we can add this to
119: // to the local destination sequence list, otherwise we have to wait for
120: // and incoming CreateSequence request
121: if (null != offeredId) {
122: assert null != csr.getAccept();
123: RMDestination dest = source.getHandler().getDestination();
124: String address = csr.getAccept().getAcksTo().getAddress()
125: .getValue();
126: if (!RMUtils.getAddressingConstants().getNoneURI().equals(
127: address)) {
128: DestinationSequence ds = new DestinationSequence(
129: offeredId, csr.getAccept().getAcksTo(), dest);
130: dest.addSequence(ds);
131: }
132: }
133: }
134:
135: /**
136: * Checks if the terminated sequence was created in response to a createSequence
137: * request that included an offer for an inbound sequence which was accepted.
138: * the offering identifier is equal to the identifier of the sequence now terminated,
139: * and request termination of that sequence in turn.
140: *
141: * @param destination
142: * @param sid
143: * @throws SequenceFault
144: */
145: public void terminateSequence(RMDestination destination,
146: Identifier sid) throws SequenceFault {
147: // check if the terminated sequence was created in response to a a createSequence
148: // request
149:
150: DestinationSequence terminatedSeq = destination
151: .getSequence(sid);
152: if (null == terminatedSeq) {
153: LOG.severe("No such sequence.");
154: return;
155: }
156:
157: destination.removeSequence(terminatedSeq);
158:
159: // the following may be necessary if the last message for this sequence was a oneway
160: // request and hence there was no response to which a last message could have been added
161:
162: for (SourceSequence outboundSeq : destination.getHandler()
163: .getSource().getAllSequences()) {
164: if (outboundSeq.offeredBy(sid)
165: && !outboundSeq.isLastMessage()) {
166:
167: // send an out of band message with an empty body and a
168: // sequence header containing a lastMessage element.
169:
170: RMProxy proxy = destination.getHandler().getProxy();
171: try {
172: proxy.lastMessage(outboundSeq);
173: } catch (IOException ex) {
174: LOG.log(Level.SEVERE,
175: "Could not terminate correlated sequence.",
176: ex);
177: }
178:
179: break;
180: }
181: }
182:
183: }
184:
185: protected Identifier clearUnattachedIdentifier() {
186: Identifier ret = unattachedIdentifier;
187: unattachedIdentifier = null;
188: return ret;
189: }
190:
191: protected void setUnattachedIdentifier(Identifier i) {
192: unattachedIdentifier = i;
193: }
194: }
|