001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm;
019:
020: import java.math.BigInteger;
021: import java.util.Collections;
022: import java.util.List;
023: import java.util.logging.Level;
024: import java.util.logging.Logger;
025:
026: import javax.xml.datatype.Duration;
027:
028: import org.apache.cxf.common.logging.LogUtils;
029: import org.apache.cxf.jaxb.DatatypeFactory;
030: import org.apache.cxf.message.Exchange;
031: import org.apache.cxf.message.Message;
032: import org.apache.cxf.service.invoker.Invoker;
033: import org.apache.cxf.service.model.OperationInfo;
034: import org.apache.cxf.ws.addressing.AddressingProperties;
035: import org.apache.cxf.ws.addressing.VersionTransformer;
036: import org.apache.cxf.ws.addressing.v200408.AttributedURI;
037: import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
038:
039: /**
040: *
041: */
042: public class Servant implements Invoker {
043:
044: private static final Logger LOG = LogUtils
045: .getL7dLogger(Servant.class);
046: private RMEndpoint reliableEndpoint;
047: // REVISIT assumption there is only a single outstanding unattached Identifier
048: private Identifier unattachedIdentifier;
049:
050: Servant(RMEndpoint rme) {
051: reliableEndpoint = rme;
052: }
053:
054: public Object invoke(Exchange exchange, Object o) {
055: LOG.fine("Invoking on RM Endpoint");
056: OperationInfo oi = exchange.get(OperationInfo.class);
057: if (null == oi) {
058: LOG.fine("No operation info.");
059: return null;
060: }
061:
062: // TODO: throw Fault, see AbstractRMInterceptor
063:
064: if (RMConstants.getCreateSequenceOperationName().equals(
065: oi.getName())
066: || RMConstants.getCreateSequenceOnewayOperationName()
067: .equals(oi.getName())) {
068: try {
069: return Collections
070: .singletonList(createSequence(exchange
071: .getInMessage()));
072: } catch (SequenceFault ex) {
073: ex.printStackTrace();
074: } catch (Exception ex) {
075: ex.printStackTrace();
076: }
077: } else if (RMConstants
078: .getCreateSequenceResponseOnewayOperationName().equals(
079: oi.getName())) {
080: CreateSequenceResponseType createResponse = (CreateSequenceResponseType) getParameter(exchange
081: .getInMessage());
082: try {
083: createSequenceResponse(createResponse);
084: } catch (SequenceFault ex) {
085: ex.printStackTrace();
086: }
087: } else if (RMConstants.getTerminateSequenceOperationName()
088: .equals(oi.getName())) {
089: try {
090: terminateSequence(exchange.getInMessage());
091: } catch (SequenceFault ex) {
092: ex.printStackTrace();
093: } catch (RMException ex) {
094: ex.printStackTrace();
095: }
096: }
097:
098: return null;
099: }
100:
101: CreateSequenceResponseType createSequence(Message message)
102: throws SequenceFault {
103: LOG.fine("Creating sequence");
104:
105: AddressingProperties maps = RMContextUtils.retrieveMAPs(
106: message, false, false);
107: Message outMessage = message.getExchange().getOutMessage();
108: if (null != outMessage) {
109: RMContextUtils.storeMAPs(maps, outMessage, false, false);
110: }
111:
112: CreateSequenceType create = (CreateSequenceType) getParameter(message);
113: Destination destination = reliableEndpoint.getDestination();
114:
115: CreateSequenceResponseType createResponse = RMUtils
116: .getWSRMFactory().createCreateSequenceResponseType();
117: createResponse.setIdentifier(destination
118: .generateSequenceIdentifier());
119:
120: DestinationPolicyType dp = reliableEndpoint.getManager()
121: .getDestinationPolicy();
122: Duration supportedDuration = dp.getSequenceExpiration();
123: if (null == supportedDuration) {
124: supportedDuration = DatatypeFactory.PT0S;
125: }
126: Expires ex = create.getExpires();
127:
128: if (null != ex
129: || supportedDuration
130: .isShorterThan(DatatypeFactory.PT0S)) {
131: Duration effectiveDuration = supportedDuration;
132: if (null != ex
133: && supportedDuration.isLongerThan(ex.getValue())) {
134: effectiveDuration = supportedDuration;
135: }
136: ex = RMUtils.getWSRMFactory().createExpires();
137: ex.setValue(effectiveDuration);
138: createResponse.setExpires(ex);
139: }
140:
141: OfferType offer = create.getOffer();
142: if (null != offer) {
143: AcceptType accept = RMUtils.getWSRMFactory()
144: .createAcceptType();
145: if (dp.isAcceptOffers()) {
146: Source source = reliableEndpoint.getSource();
147: LOG.fine("Accepting inbound sequence offer");
148: // AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
149: AttributedURI to = VersionTransformer.convert(maps
150: .getTo());
151: accept.setAcksTo(RMUtils.createReference2004(to
152: .getValue()));
153: SourceSequence seq = new SourceSequence(offer
154: .getIdentifier(), null, createResponse
155: .getIdentifier());
156: seq.setExpires(offer.getExpires());
157: seq.setTarget(VersionTransformer.convert(create
158: .getAcksTo()));
159: source.addSequence(seq);
160: source.setCurrent(createResponse.getIdentifier(), seq);
161: if (LOG.isLoggable(Level.FINE)) {
162: LOG
163: .fine("Making offered sequence the current sequence for responses to "
164: + createResponse.getIdentifier()
165: .getValue());
166: }
167: } else {
168: if (LOG.isLoggable(Level.FINE)) {
169: LOG.fine("Refusing inbound sequence offer");
170: }
171: accept.setAcksTo(RMUtils.createNoneReference2004());
172: }
173: createResponse.setAccept(accept);
174: }
175:
176: DestinationSequence seq = new DestinationSequence(
177: createResponse.getIdentifier(), create.getAcksTo(),
178: destination);
179: seq.setCorrelationID(maps.getMessageID().getValue());
180: destination.addSequence(seq);
181: LOG.fine("returning " + createResponse);
182: return createResponse;
183: }
184:
185: public void createSequenceResponse(
186: CreateSequenceResponseType createResponse)
187: throws SequenceFault {
188: LOG.fine("Creating sequence response");
189:
190: SourceSequence seq = new SourceSequence(createResponse
191: .getIdentifier());
192: seq.setExpires(createResponse.getExpires());
193: Source source = reliableEndpoint.getSource();
194: source.addSequence(seq);
195:
196: // the incoming sequence ID is either used as the requestor sequence
197: // (signalled by null) or associated with a corresponding sequence
198: // identifier
199: source.setCurrent(clearUnattachedIdentifier(), seq);
200:
201: // if a sequence was offered and accepted, then we can add this to
202: // to the local destination sequence list, otherwise we have to wait for
203: // and incoming CreateSequence request
204:
205: Identifier offeredId = reliableEndpoint.getProxy()
206: .getOfferedIdentifier();
207: if (null != offeredId) {
208: AcceptType accept = createResponse.getAccept();
209: assert null != accept;
210: Destination dest = reliableEndpoint.getDestination();
211: String address = accept.getAcksTo().getAddress().getValue();
212: if (!RMUtils.getAddressingConstants().getNoneURI().equals(
213: address)) {
214: DestinationSequence ds = new DestinationSequence(
215: offeredId, accept.getAcksTo(), dest);
216: dest.addSequence(ds);
217: }
218: }
219: }
220:
221: public void terminateSequence(Message message)
222: throws SequenceFault, RMException {
223: LOG.fine("Terminating sequence");
224:
225: TerminateSequenceType terminate = (TerminateSequenceType) getParameter(message);
226:
227: // check if the terminated sequence was created in response to a a createSequence
228: // request
229:
230: Destination destination = reliableEndpoint.getDestination();
231: Identifier sid = terminate.getIdentifier();
232: DestinationSequence terminatedSeq = destination
233: .getSequence(sid);
234: if (null == terminatedSeq) {
235: // TODO
236: LOG.severe("No such sequence.");
237: return;
238: }
239:
240: destination.removeSequence(terminatedSeq);
241:
242: // the following may be necessary if the last message for this sequence was a oneway
243: // request and hence there was no response to which a last message could have been added
244:
245: // REVISIT: A last message for the correlated sequence should have been sent by the time
246: // the last message for the underlying sequence was received.
247:
248: Source source = reliableEndpoint.getSource();
249:
250: for (SourceSequence outboundSeq : source.getAllSequences()) {
251: if (outboundSeq.offeredBy(sid)
252: && !outboundSeq.isLastMessage()) {
253:
254: if (BigInteger.ZERO.equals(outboundSeq
255: .getCurrentMessageNr())) {
256: source.removeSequence(outboundSeq);
257: }
258: // send an out of band message with an empty body and a
259: // sequence header containing a lastMessage element.
260:
261: /*
262: Proxy proxy = new Proxy(reliableEndpoint);
263: try {
264: proxy.lastMessage(outboundSeq);
265: } catch (RMException ex) {
266: LogUtils.log(LOG, Level.SEVERE, "CORRELATED_SEQ_TERMINATION_EXC", ex);
267: }
268: */
269:
270: break;
271: }
272: }
273:
274: }
275:
276: Object getParameter(Message message) {
277: List resList = null;
278: // assert message == message.getExchange().getInMessage();
279:
280: if (message != null) {
281: resList = message.getContent(List.class);
282: }
283:
284: if (resList != null) {
285: return resList.get(0);
286: }
287: return null;
288: }
289:
290: Identifier clearUnattachedIdentifier() {
291: Identifier ret = unattachedIdentifier;
292: unattachedIdentifier = null;
293: return ret;
294: }
295:
296: void setUnattachedIdentifier(Identifier i) {
297: unattachedIdentifier = i;
298: }
299: }
|