001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm;
019:
020: import java.util.Collection;
021: import java.util.HashMap;
022: import java.util.Map;
023:
024: import org.apache.cxf.helpers.CastUtils;
025: import org.apache.cxf.message.Message;
026: import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
027: import org.apache.cxf.ws.rm.persistence.RMStore;
028:
029: public class Destination extends AbstractEndpoint {
030:
031: private Map<String, DestinationSequence> map;
032:
033: Destination(RMEndpoint reliableEndpoint) {
034: super (reliableEndpoint);
035: map = new HashMap<String, DestinationSequence>();
036: }
037:
038: public DestinationSequence getSequence(Identifier id) {
039: return map.get(id.getValue());
040: }
041:
042: public Collection<DestinationSequence> getAllSequences() {
043: return CastUtils.cast(map.values());
044: }
045:
046: public void addSequence(DestinationSequence seq) {
047: addSequence(seq, true);
048: }
049:
050: public void addSequence(DestinationSequence seq, boolean persist) {
051: seq.setDestination(this );
052: map.put(seq.getIdentifier().getValue(), seq);
053: if (persist) {
054: RMStore store = getReliableEndpoint().getManager()
055: .getStore();
056: if (null != store) {
057: store.createDestinationSequence(seq);
058: }
059: }
060: }
061:
062: public void removeSequence(DestinationSequence seq) {
063: map.remove(seq.getIdentifier().getValue());
064: RMStore store = getReliableEndpoint().getManager().getStore();
065: if (null != store) {
066: store.removeDestinationSequence(seq.getIdentifier());
067: }
068: }
069:
070: /**
071: * Acknowledges receipt of a message. If the message is the last in the
072: * sequence, sends an out-of-band SequenceAcknowledgement unless there a
073: * response will be sent to the acksTo address onto which the acknowldegment
074: * can be piggybacked.
075: *
076: * @param sequenceType the sequenceType object that includes identifier and
077: * message number (and possibly a lastMessage element) for the
078: * message to be acknowledged)
079: * @param replyToAddress the replyTo address of the message that carried
080: * this sequence information
081: * @throws SequenceFault if the sequence specified in
082: * <code>sequenceType</code> does not exist
083: */
084: public void acknowledge(Message message) throws SequenceFault,
085: RMException {
086: SequenceType sequenceType = RMContextUtils
087: .retrieveRMProperties(message, false).getSequence();
088: if (null == sequenceType) {
089: return;
090: }
091:
092: DestinationSequence seq = getSequence(sequenceType
093: .getIdentifier());
094:
095: if (null != seq) {
096: seq.applyDeliveryAssurance(sequenceType.getMessageNumber());
097: seq.acknowledge(message);
098:
099: if (null != sequenceType.getLastMessage()) {
100: seq.setLastMessageNumber(sequenceType
101: .getMessageNumber());
102: ackImmediately(seq, message);
103: }
104: } else {
105: SequenceFaultFactory sff = new SequenceFaultFactory();
106: throw sff.createUnknownSequenceFault(sequenceType
107: .getIdentifier());
108: }
109: }
110:
111: void ackRequested(Message message) throws SequenceFault,
112: RMException {
113: // TODO
114: Collection<AckRequestedType> ars = RMContextUtils
115: .retrieveRMProperties(message, false)
116: .getAcksRequested();
117: if (null == ars) {
118: return;
119: }
120: for (AckRequestedType ar : ars) {
121: Identifier id = ar.getIdentifier();
122: DestinationSequence seq = getSequence(id);
123: if (null == seq) {
124: continue;
125: }
126: ackImmediately(seq, message);
127: }
128: }
129:
130: void ackImmediately(DestinationSequence seq, Message message)
131: throws RMException {
132:
133: seq.scheduleImmediateAcknowledgement();
134:
135: // if we cannot expect an outgoing message to which the
136: // acknowledgement
137: // can be added we need to send an out-of-band
138: // SequenceAcknowledgement message
139:
140: AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(
141: message, false, false);
142: String replyToAddress = null;
143: if (null != maps.getReplyTo()) {
144: replyToAddress = maps.getReplyTo().getAddress().getValue();
145: }
146: if (!(seq.getAcksTo().getAddress().getValue().equals(
147: replyToAddress) || seq
148: .canPiggybackAckOnPartialResponse())) {
149: getReliableEndpoint().getProxy().acknowledge(seq);
150: }
151: }
152:
153: }
|