001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm;
019:
020: import java.math.BigInteger;
021: import java.util.Collection;
022: import java.util.Map;
023: import java.util.logging.Level;
024: import java.util.logging.Logger;
025:
026: import org.apache.cxf.common.logging.LogUtils;
027: import org.apache.cxf.message.FaultMode;
028: import org.apache.cxf.message.Message;
029: import org.apache.cxf.message.MessageUtils;
030: import org.apache.cxf.ws.addressing.AddressingProperties;
031: import org.apache.cxf.ws.addressing.AttributedURIType;
032: import org.apache.cxf.ws.addressing.MAPAggregator;
033: import org.apache.cxf.ws.addressing.VersionTransformer;
034: import org.apache.cxf.ws.addressing.v200408.AttributedURI;
035:
036: /**
037: *
038: */
039: public class RMOutInterceptor extends AbstractRMInterceptor {
040:
041: private static final Logger LOG = LogUtils
042: .getL7dLogger(RMOutInterceptor.class);
043:
044: public RMOutInterceptor() {
045: addAfter(MAPAggregator.class.getName());
046: }
047:
048: protected void handle(Message message) throws SequenceFault,
049: RMException {
050: if (isRuntimeFault(message)) {
051: LogUtils.log(LOG, Level.WARNING, "RUNTIME_FAULT_MSG");
052: // TODO: in case of a SequenceFault need to set action
053: // to http://schemas.xmlsoap.org/ws/2004/a08/addressing/fault
054: // but: need to defer propagation of received MAPS to outbound chain first
055: return;
056: }
057:
058: AddressingProperties maps = RMContextUtils.retrieveMAPs(
059: message, false, true);
060: if (null == maps) {
061: LogUtils.log(LOG, Level.WARNING,
062: "MAPS_RETRIEVAL_FAILURE_MSG");
063: return;
064: }
065:
066: RMContextUtils.ensureExposedVersion(maps);
067:
068: Source source = getManager().getSource(message);
069: Destination destination = getManager().getDestination(message);
070:
071: String action = null;
072: if (maps != null && null != maps.getAction()) {
073: action = maps.getAction().getValue();
074: }
075:
076: if (LOG.isLoggable(Level.FINE)) {
077: LOG.fine("Action: " + action);
078: }
079:
080: boolean isApplicationMessage = !RMContextUtils
081: .isRMProtocolMessage(action);
082: boolean isPartialResponse = MessageUtils
083: .isPartialResponse(message);
084: boolean isLastMessage = RMConstants.getLastMessageAction()
085: .equals(action);
086:
087: if (isApplicationMessage && !isPartialResponse) {
088: RetransmissionInterceptor ri = new RetransmissionInterceptor();
089: ri.setManager(getManager());
090: // TODO:
091: // On the server side: If a fault occurs after this interceptor we will switch
092: // interceptor chains (if this is not already a fault message) and therefore need to
093: // make sure the retransmission interceptor is added to the fault chain
094: //
095: message.getInterceptorChain().add(ri);
096: LOG.fine("Added RetransmissionInterceptor to chain.");
097:
098: getManager().getRetransmissionQueue().start();
099: }
100:
101: RMProperties rmpsOut = (RMProperties) RMContextUtils
102: .retrieveRMProperties(message, true);
103: if (null == rmpsOut) {
104: rmpsOut = new RMProperties();
105: RMContextUtils.storeRMProperties(message, rmpsOut, true);
106: }
107:
108: RMProperties rmpsIn = null;
109: Identifier inSeqId = null;
110: BigInteger inMessageNumber = null;
111:
112: if (isApplicationMessage) {
113: rmpsIn = (RMProperties) RMContextUtils
114: .retrieveRMProperties(message, false);
115: if (null != rmpsIn && null != rmpsIn.getSequence()) {
116: inSeqId = rmpsIn.getSequence().getIdentifier();
117: inMessageNumber = rmpsIn.getSequence()
118: .getMessageNumber();
119: }
120: }
121:
122: if ((isApplicationMessage || isLastMessage)
123: && !isPartialResponse) {
124: if (LOG.isLoggable(Level.FINE)) {
125: LOG.fine("inbound sequence: "
126: + (null == inSeqId ? "null" : inSeqId
127: .getValue()));
128: }
129:
130: // get the current sequence, requesting the creation of a new one if necessary
131:
132: synchronized (source) {
133: SourceSequence seq = null;
134: if (isLastMessage) {
135: Map<?, ?> invocationContext = (Map) message
136: .get(Message.INVOCATION_CONTEXT);
137: seq = (SourceSequence) invocationContext
138: .get(SourceSequence.class.getName());
139: } else {
140: seq = getManager().getSequence(inSeqId, message,
141: maps);
142: }
143: assert null != seq;
144:
145: // increase message number and store a sequence type object in
146: // context
147:
148: seq.nextMessageNumber(inSeqId, inMessageNumber,
149: isLastMessage);
150:
151: rmpsOut.setSequence(seq);
152:
153: // if this was the last message in the sequence, reset the
154: // current sequence so that a new one will be created next
155: // time the handler is invoked
156:
157: if (seq.isLastMessage()) {
158: source.setCurrent(null);
159: }
160: }
161: } else {
162: if (!MessageUtils.isRequestor(message)
163: && RMConstants.getCreateSequenceAction().equals(
164: action)) {
165: maps.getAction().setValue(
166: RMConstants.getCreateSequenceResponseAction());
167: }
168: }
169:
170: // add Acknowledgements (to application messages or explicitly
171: // created Acknowledgement messages only)
172:
173: if (isApplicationMessage
174: || RMConstants.getSequenceAcknowledgmentAction()
175: .equals(action)) {
176: AttributedURI to = VersionTransformer.convert(maps.getTo());
177: assert null != to;
178: addAcknowledgements(destination, rmpsOut, inSeqId, to);
179: if (isPartialResponse && rmpsOut.getAcks() != null
180: && rmpsOut.getAcks().size() > 0) {
181: AttributedURIType actionURI = new AttributedURIType();
182: actionURI.setValue(RMConstants
183: .getSequenceAcknowledgmentAction());
184: maps.setAction(actionURI);
185: }
186: }
187:
188: if (RMConstants.getSequenceAckAction().equals(action)
189: || RMConstants.getTerminateSequenceAction().equals(
190: action)) {
191: maps.setReplyTo(RMUtils.createNoneReference());
192: }
193:
194: assertReliability(message);
195: }
196:
197: void addAcknowledgements(Destination destination,
198: RMProperties rmpsOut, Identifier inSeqId, AttributedURI to) {
199: for (DestinationSequence seq : destination.getAllSequences()) {
200: if (!seq.sendAcknowledgement()) {
201: if (LOG.isLoggable(Level.FINE)) {
202: LOG
203: .fine("no need to add acknowledgements for sequence "
204: + seq.getIdentifier().getValue());
205: }
206: continue;
207: }
208: if (!to.getValue().equals(
209: seq.getAcksTo().getAddress().getValue())) {
210: if (LOG.isLoggable(Level.FINE)) {
211: LOG.fine("sequences acksTo address ("
212: + seq.getAcksTo().getAddress().getValue()
213: + ") does not match to address ("
214: + to.getValue() + ")");
215: }
216: continue;
217: }
218: // there may be multiple sources with anonymous acksTo
219: if (RMConstants.getAnonymousAddress().equals(
220: seq.getAcksTo().getAddress().getValue())
221: && !AbstractSequence.identifierEquals(seq
222: .getIdentifier(), inSeqId)) {
223: if (LOG.isLoggable(Level.FINE)) {
224: LOG
225: .fine("sequence identifier does not match inbound sequence identifier");
226: }
227: continue;
228: }
229: rmpsOut.addAck(seq);
230: }
231:
232: if (LOG.isLoggable(Level.FINE)) {
233: Collection<SequenceAcknowledgement> acks = rmpsOut
234: .getAcks();
235: if (null == acks) {
236: LOG.fine("No acknowledgements added.");
237: } else {
238: LOG.fine("Added " + acks.size() + " acknowledgements.");
239: }
240: }
241: }
242:
243: boolean isRuntimeFault(Message message) {
244: FaultMode mode = MessageUtils.getFaultMode(message);
245: if (null == mode) {
246: return false;
247: }
248: return FaultMode.CHECKED_APPLICATION_FAULT != mode;
249: }
250: }
|