001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm;
019:
020: import java.util.Collections;
021: import java.util.Map;
022: import java.util.logging.Level;
023: import java.util.logging.Logger;
024:
025: import javax.xml.datatype.Duration;
026:
027: import org.apache.cxf.Bus;
028: import org.apache.cxf.common.logging.LogUtils;
029: import org.apache.cxf.endpoint.Client;
030: import org.apache.cxf.endpoint.ClientImpl;
031: import org.apache.cxf.endpoint.ConduitSelector;
032: import org.apache.cxf.endpoint.DeferredConduitSelector;
033: import org.apache.cxf.endpoint.Endpoint;
034: import org.apache.cxf.message.Message;
035: import org.apache.cxf.service.model.BindingInfo;
036: import org.apache.cxf.service.model.BindingOperationInfo;
037: import org.apache.cxf.service.model.EndpointInfo;
038: import org.apache.cxf.service.model.InterfaceInfo;
039: import org.apache.cxf.service.model.OperationInfo;
040: import org.apache.cxf.transport.Conduit;
041: import org.apache.cxf.ws.addressing.AttributedURIType;
042: import org.apache.cxf.ws.addressing.RelatesToType;
043: import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
044: import org.apache.cxf.ws.rm.manager.SourcePolicyType;
045:
046: /**
047: *
048: */
049: public class Proxy {
050:
051: private static final Logger LOG = LogUtils
052: .getL7dLogger(Proxy.class);
053:
054: private RMEndpoint reliableEndpoint;
055: // REVISIT assumption there is only a single outstanding offer
056: private Identifier offeredIdentifier;
057:
058: public Proxy(RMEndpoint rme) {
059: reliableEndpoint = rme;
060: }
061:
062: RMEndpoint getReliableEndpoint() {
063: return reliableEndpoint;
064: }
065:
066: void acknowledge(DestinationSequence ds) throws RMException {
067: if (RMConstants.getAnonymousAddress().equals(
068: ds.getAcksTo().getAddress().getValue())) {
069: LOG
070: .log(Level.WARNING,
071: "STANDALONE_ANON_ACKS_NOT_SUPPORTED");
072: return;
073: }
074:
075: OperationInfo oi = reliableEndpoint
076: .getEndpoint()
077: .getEndpointInfo()
078: .getService()
079: .getInterface()
080: .getOperation(RMConstants.getSequenceAckOperationName());
081: invoke(oi, new Object[] {}, null);
082: }
083:
084: void terminate(SourceSequence ss) throws RMException {
085: OperationInfo oi = reliableEndpoint
086: .getEndpoint()
087: .getEndpointInfo()
088: .getService()
089: .getInterface()
090: .getOperation(
091: RMConstants.getTerminateSequenceOperationName());
092:
093: TerminateSequenceType ts = RMUtils.getWSRMFactory()
094: .createTerminateSequenceType();
095: ts.setIdentifier(ss.getIdentifier());
096: invoke(oi, new Object[] { ts }, null);
097: }
098:
099: void createSequenceResponse(
100: final CreateSequenceResponseType createResponse)
101: throws RMException {
102: LOG.fine("sending CreateSequenceResponse from client side");
103: final OperationInfo oi = reliableEndpoint
104: .getEndpoint()
105: .getEndpointInfo()
106: .getService()
107: .getInterface()
108: .getOperation(
109: RMConstants
110: .getCreateSequenceResponseOnewayOperationName());
111:
112: // TODO: need to set relatesTo
113:
114: invoke(oi, new Object[] { createResponse }, null);
115:
116: }
117:
118: public CreateSequenceResponseType createSequence(
119: EndpointReferenceType defaultAcksTo,
120: RelatesToType relatesTo, boolean isServer)
121: throws RMException {
122:
123: SourcePolicyType sp = reliableEndpoint.getManager()
124: .getSourcePolicy();
125: final CreateSequenceType create = RMUtils.getWSRMFactory()
126: .createCreateSequenceType();
127:
128: String address = sp.getAcksTo();
129: EndpointReferenceType acksTo = null;
130: if (null != address) {
131: acksTo = RMUtils.createReference2004(address);
132: } else {
133: acksTo = defaultAcksTo;
134: }
135: create.setAcksTo(acksTo);
136:
137: Duration d = sp.getSequenceExpiration();
138: if (null != d) {
139: Expires expires = RMUtils.getWSRMFactory().createExpires();
140: expires.setValue(d);
141: create.setExpires(expires);
142: }
143:
144: if (sp.isIncludeOffer()) {
145: OfferType offer = RMUtils.getWSRMFactory()
146: .createOfferType();
147: d = sp.getOfferedSequenceExpiration();
148: if (null != d) {
149: Expires expires = RMUtils.getWSRMFactory()
150: .createExpires();
151: expires.setValue(d);
152: offer.setExpires(expires);
153: }
154: offer.setIdentifier(reliableEndpoint.getSource()
155: .generateSequenceIdentifier());
156: create.setOffer(offer);
157: setOfferedIdentifier(offer);
158: }
159:
160: InterfaceInfo ii = reliableEndpoint.getEndpoint()
161: .getEndpointInfo().getService().getInterface();
162:
163: final OperationInfo oi = isServer ? ii.getOperation(RMConstants
164: .getCreateSequenceOnewayOperationName()) : ii
165: .getOperation(RMConstants
166: .getCreateSequenceOperationName());
167:
168: // tried using separate thread - did not help either
169:
170: if (isServer) {
171: LOG.fine("sending CreateSequenceRequest from server side");
172: Runnable r = new Runnable() {
173: public void run() {
174: try {
175: invoke(oi, new Object[] { create }, null);
176: } catch (RMException ex) {
177: // already logged
178: }
179: }
180: };
181: reliableEndpoint.getApplicationEndpoint().getExecutor()
182: .execute(r);
183: return null;
184: }
185:
186: return (CreateSequenceResponseType) invoke(oi,
187: new Object[] { create }, null);
188: }
189:
190: void lastMessage(SourceSequence s) throws RMException {
191: org.apache.cxf.ws.addressing.EndpointReferenceType target = s
192: .getTarget();
193: AttributedURIType uri = null;
194: if (null != target) {
195: uri = target.getAddress();
196: }
197: String addr = null;
198: if (null != uri) {
199: addr = uri.getValue();
200: }
201:
202: if (addr == null) {
203: LOG.log(Level.WARNING,
204: "STANDALONE_LAST_MESSAGE_NO_TARGET_MSG");
205: return;
206: }
207:
208: if (RMUtils.getAddressingConstants().getAnonymousURI().equals(
209: addr)) {
210: LOG.log(Level.WARNING,
211: "STANDALONE_LAST_MESSAGE_ANON_TARGET_MSG");
212: return;
213: }
214:
215: OperationInfo oi = reliableEndpoint
216: .getEndpoint()
217: .getEndpointInfo()
218: .getService()
219: .getInterface()
220: .getOperation(RMConstants.getLastMessageOperationName());
221: // pass reference to source sequence in invocation context
222: Map<String, Object> context = Collections.singletonMap(
223: SourceSequence.class.getName(), (Object) s);
224:
225: invoke(oi, new Object[] {}, context);
226: }
227:
228: void ackRequested(SourceSequence s) throws RMException {
229: org.apache.cxf.ws.addressing.EndpointReferenceType target = s
230: .getTarget();
231: AttributedURIType uri = null;
232: if (null != target) {
233: uri = target.getAddress();
234: }
235: String addr = null;
236: if (null != uri) {
237: addr = uri.getValue();
238: }
239:
240: if (addr == null) {
241: LOG.log(Level.WARNING,
242: "STANDALONE_ACK_REQUESTED_NO_TARGET_MSG");
243: return;
244: }
245:
246: if (RMUtils.getAddressingConstants().getAnonymousURI().equals(
247: addr)) {
248: LOG.log(Level.WARNING,
249: "STANDALONE_ACK_REQUESTED_ANON_TARGET_MSG");
250: return;
251: }
252:
253: OperationInfo oi = reliableEndpoint.getEndpoint()
254: .getEndpointInfo().getService().getInterface()
255: .getOperation(
256: RMConstants.getAckRequestedOperationName());
257: invoke(oi, new Object[] {}, null);
258: }
259:
260: Identifier getOfferedIdentifier() {
261: return offeredIdentifier;
262: }
263:
264: void setOfferedIdentifier(OfferType offer) {
265: if (offer != null) {
266: offeredIdentifier = offer.getIdentifier();
267: }
268: }
269:
270: Object invoke(OperationInfo oi, Object[] params,
271: Map<String, Object> context) throws RMException {
272:
273: if (LOG.isLoggable(Level.INFO)) {
274: LOG.log(Level.INFO,
275: "Sending out-of-band RM protocol message {0}.",
276: oi == null ? null : oi.getName());
277: }
278:
279: RMManager manager = reliableEndpoint.getManager();
280: Bus bus = manager.getBus();
281: Endpoint endpoint = reliableEndpoint.getEndpoint();
282: BindingInfo bi = reliableEndpoint.getBindingInfo();
283: Conduit c = reliableEndpoint.getConduit();
284: org.apache.cxf.ws.addressing.EndpointReferenceType replyTo = reliableEndpoint
285: .getReplyTo();
286: Client client = createClient(bus, endpoint, c, replyTo);
287:
288: BindingOperationInfo boi = bi.getOperation(oi);
289: try {
290: Object[] result = client.invoke(boi, params, context);
291: if (result != null && result.length > 0) {
292: return result[0];
293: }
294:
295: } catch (Exception ex) {
296: org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
297: "SEND_PROTOCOL_MSG_FAILED_EXC", LOG,
298: oi == null ? null : oi.getName());
299: LOG.log(Level.SEVERE, msg.toString(), ex);
300: throw new RMException(msg, ex);
301: }
302: return null;
303: }
304:
305: protected Client createClient(
306: Bus bus,
307: Endpoint endpoint,
308: Conduit conduit,
309: final org.apache.cxf.ws.addressing.EndpointReferenceType address) {
310: ConduitSelector cs = new DeferredConduitSelector(conduit) {
311: @Override
312: public synchronized Conduit selectConduit(Message message) {
313: Conduit conduit = null;
314: EndpointInfo endpointInfo = getEndpoint()
315: .getEndpointInfo();
316: org.apache.cxf.ws.addressing.EndpointReferenceType original = endpointInfo
317: .getTarget();
318: try {
319: if (null != address) {
320: endpointInfo.setAddress(address);
321: }
322: conduit = super .selectConduit(message);
323: } finally {
324: endpointInfo.setAddress(original);
325: }
326: return conduit;
327: }
328: };
329: return new RMClient(bus, endpoint, cs);
330: }
331:
332: class RMClient extends ClientImpl {
333:
334: RMClient(Bus bus, Endpoint endpoint, ConduitSelector cs) {
335: super (bus, endpoint, cs);
336: }
337:
338: @Override
339: public void onMessage(Message m) {
340: m.getExchange().put(
341: Endpoint.class,
342: Proxy.this .reliableEndpoint
343: .getApplicationEndpoint());
344: super .onMessage(m);
345: }
346: }
347:
348: // for test
349:
350: void setReliableEndpoint(RMEndpoint rme) {
351: reliableEndpoint = rme;
352: }
353:
354: }
|