001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm;
019:
020: import java.util.Collection;
021: import java.util.logging.Level;
022: import java.util.logging.Logger;
023:
024: import org.apache.cxf.common.logging.LogUtils;
025: import org.apache.cxf.message.Message;
026: import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
027: import org.apache.cxf.ws.addressing.MAPAggregator;
028:
029: /**
030: *
031: */
032: public class RMInInterceptor extends AbstractRMInterceptor {
033:
034: private static final Logger LOG = LogUtils
035: .getL7dLogger(RMInInterceptor.class);
036:
037: public RMInInterceptor() {
038:
039: addBefore(MAPAggregator.class.getName());
040: }
041:
042: protected void handle(Message message) throws SequenceFault,
043: RMException {
044: LOG.entering(getClass().getName(), "handleMessage");
045:
046: RMProperties rmps = RMContextUtils.retrieveRMProperties(
047: message, false);
048:
049: // message addressing properties may be null, e.g. in case of a runtime fault
050: // on the server side
051: final AddressingPropertiesImpl maps = RMContextUtils
052: .retrieveMAPs(message, false, false);
053: if (null == maps) {
054: return;
055: }
056:
057: String action = null;
058: if (null != maps.getAction()) {
059: action = maps.getAction().getValue();
060: }
061:
062: if (LOG.isLoggable(Level.FINE)) {
063: LOG.fine("Action: " + action);
064: }
065:
066: Object originalRequestor = message
067: .get(RMMessageConstants.ORIGINAL_REQUESTOR_ROLE);
068: if (null != originalRequestor) {
069: LOG.fine("Restoring original requestor role to: "
070: + originalRequestor);
071: message.put(Message.REQUESTOR_ROLE, originalRequestor);
072: }
073:
074: // Destination destination = getManager().getDestination(message);
075: // RMEndpoint rme = getManager().getReliableEndpoint(message);
076: // Servant servant = new Servant(rme);
077:
078: boolean isServer = RMContextUtils.isServerSide(message);
079: LOG.fine("isServerSide: " + isServer);
080: boolean isApplicationMessage = !RMContextUtils
081: .isRMProtocolMessage(action);
082: LOG.fine("isApplicationMessage: " + isApplicationMessage);
083:
084: // for application AND out of band messages
085:
086: RMEndpoint rme = getManager().getReliableEndpoint(message);
087: Destination destination = getManager().getDestination(message);
088:
089: if (isApplicationMessage) {
090: if (null != rmps) {
091: processAcknowledgments(rme.getSource(), rmps);
092: processAcknowledgmentRequests(destination, message);
093: processSequence(destination, message);
094: processDeliveryAssurance(rmps);
095: }
096: rme.receivedApplicationMessage();
097: } else {
098: rme.receivedControlMessage();
099: if (RMConstants.getSequenceAckAction().equals(action)) {
100: processAcknowledgments(rme.getSource(), rmps);
101: } else if (RMConstants.getLastMessageAction()
102: .equals(action)) {
103: processSequence(destination, message);
104: } else if (RMConstants.getCreateSequenceAction().equals(
105: action)
106: && !isServer) {
107: LOG
108: .fine("Processing inbound CreateSequence on client side.");
109: Servant servant = rme.getServant();
110: CreateSequenceResponseType csr = servant
111: .createSequence(message);
112: Proxy proxy = rme.getProxy();
113: proxy.createSequenceResponse(csr);
114: return;
115: }
116: }
117:
118: assertReliability(message);
119: }
120:
121: void processAcknowledgments(Source source, RMProperties rmps)
122: throws SequenceFault, RMException {
123:
124: Collection<SequenceAcknowledgement> acks = rmps.getAcks();
125: if (null != acks) {
126: for (SequenceAcknowledgement ack : acks) {
127: Identifier id = ack.getIdentifier();
128: SourceSequence ss = source.getSequence(id);
129: if (null != ss) {
130: ss.setAcknowledged(ack);
131: } else {
132: throw (new SequenceFaultFactory())
133: .createUnknownSequenceFault(id);
134: }
135: }
136: }
137: }
138:
139: void processAcknowledgmentRequests(Destination destination,
140: Message message) throws SequenceFault, RMException {
141: destination.ackRequested(message);
142: }
143:
144: void processSequence(Destination destination, Message message)
145: throws SequenceFault, RMException {
146: destination.acknowledge(message);
147: }
148:
149: void processDeliveryAssurance(RMProperties rmps) {
150:
151: }
152: }
|