001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm;
019:
020: import java.util.Collection;
021: import java.util.List;
022: import java.util.logging.Level;
023: import java.util.logging.Logger;
024:
025: import javax.wsdl.extensions.ExtensibilityElement;
026: import javax.xml.bind.JAXBException;
027: import javax.xml.namespace.QName;
028:
029: import org.apache.cxf.binding.soap.model.SoapBindingInfo;
030: import org.apache.cxf.binding.soap.model.SoapOperationInfo;
031: import org.apache.cxf.common.logging.LogUtils;
032: import org.apache.cxf.databinding.DataBinding;
033: import org.apache.cxf.endpoint.Endpoint;
034: import org.apache.cxf.interceptor.Interceptor;
035: import org.apache.cxf.jaxb.JAXBDataBinding;
036: import org.apache.cxf.service.Service;
037: import org.apache.cxf.service.factory.ServiceConstructionException;
038: import org.apache.cxf.service.model.BindingInfo;
039: import org.apache.cxf.service.model.BindingOperationInfo;
040: import org.apache.cxf.service.model.EndpointInfo;
041: import org.apache.cxf.service.model.InterfaceInfo;
042: import org.apache.cxf.service.model.MessageInfo;
043: import org.apache.cxf.service.model.MessagePartInfo;
044: import org.apache.cxf.service.model.OperationInfo;
045: import org.apache.cxf.service.model.ServiceInfo;
046: import org.apache.cxf.transport.Conduit;
047: import org.apache.cxf.ws.addressing.Names;
048: import org.apache.cxf.ws.policy.EffectivePolicy;
049: import org.apache.cxf.ws.policy.EndpointPolicy;
050: import org.apache.cxf.ws.policy.PolicyEngine;
051: import org.apache.cxf.ws.policy.PolicyInterceptorProviderRegistry;
052: import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
053: import org.apache.cxf.ws.rm.manager.SourcePolicyType;
054: import org.apache.neethi.Assertion;
055: import org.apache.neethi.Policy;
056:
057: public class RMEndpoint {
058:
059: private static final Logger LOG = LogUtils
060: .getL7dLogger(RMEndpoint.class);
061:
062: private static final QName SERVICE_NAME = new QName(RMConstants
063: .getWsdlNamespace(), "SequenceAbstractService");
064: private static final QName INTERFACE_NAME = new QName(RMConstants
065: .getWsdlNamespace(), "SequenceAbstractPortType");
066: private static final QName BINDING_NAME = new QName(RMConstants
067: .getWsdlNamespace(), "SequenceAbstractSoapBinding");
068:
069: private static final QName CREATE_PART_NAME = new QName(RMConstants
070: .getWsdlNamespace(), "create");
071: private static final QName CREATE_RESPONSE_PART_NAME = new QName(
072: RMConstants.getWsdlNamespace(), "createResponse");
073: private static final QName TERMINATE_PART_NAME = new QName(
074: RMConstants.getWsdlNamespace(), "terminate");
075:
076: private RMManager manager;
077: private Endpoint applicationEndpoint;
078: private Conduit conduit;
079: private org.apache.cxf.ws.addressing.EndpointReferenceType replyTo;
080: private Source source;
081: private Destination destination;
082: private WrappedService service;
083: private Endpoint endpoint;
084: private Proxy proxy;
085: private Servant servant;
086: private long lastApplicationMessage;
087: private long lastControlMessage;
088:
089: public RMEndpoint(RMManager m, Endpoint ae) {
090: manager = m;
091: applicationEndpoint = ae;
092: source = new Source(this );
093: destination = new Destination(this );
094: proxy = new Proxy(this );
095: servant = new Servant(this );
096: }
097:
098: /**
099: * @return Returns the bus.
100: */
101: public RMManager getManager() {
102: return manager;
103: }
104:
105: /**
106: * @return Returns the application endpoint.
107: */
108: public Endpoint getApplicationEndpoint() {
109: return applicationEndpoint;
110: }
111:
112: /**
113: * @return Returns the RM protocol endpoint.
114: */
115: public Endpoint getEndpoint() {
116: return endpoint;
117: }
118:
119: /**
120: * @return Returns the RM protocol service.
121: */
122: public Service getService() {
123: return service;
124: }
125:
126: /**
127: * @return Returns the RM protocol binding info.
128: */
129: public BindingInfo getBindingInfo() {
130: return service.getServiceInfo().getBinding(BINDING_NAME);
131: }
132:
133: /**
134: * @return Returns the proxy.
135: */
136: public Proxy getProxy() {
137: return proxy;
138: }
139:
140: /**
141: * @return Returns the servant.
142: */
143: public Servant getServant() {
144: return servant;
145: }
146:
147: /**
148: * @return Returns the destination.
149: */
150: public Destination getDestination() {
151: return destination;
152: }
153:
154: /**
155: * @param destination The destination to set.
156: */
157: public void setDestination(Destination destination) {
158: this .destination = destination;
159: }
160:
161: /**
162: * @return Returns the source.
163: */
164: public Source getSource() {
165: return source;
166: }
167:
168: /**
169: * @param source The source to set.
170: */
171: public void setSource(Source source) {
172: this .source = source;
173: }
174:
175: /**
176: * @return The time when last application message was received.
177: */
178: public long getLastApplicationMessage() {
179: return lastApplicationMessage;
180: }
181:
182: /**
183: * Indicates that an application message has been received.
184: */
185: public void receivedApplicationMessage() {
186: lastApplicationMessage = System.currentTimeMillis();
187: }
188:
189: /**
190: * @return The time when last RM protocol message was received.
191: */
192: public long getLastControlMessage() {
193: return lastControlMessage;
194: }
195:
196: /**
197: * Indicates that an RM protocol message has been received.
198: */
199: public void receivedControlMessage() {
200: lastControlMessage = System.currentTimeMillis();
201: }
202:
203: /**
204: * @return Returns the conduit.
205: */
206: public Conduit getConduit() {
207: return conduit;
208: }
209:
210: /**
211: * Returns the replyTo address of the first application request, i.e. the
212: * target address to which to send CreateSequence, CreateSequenceResponse
213: * and TerminateSequence messages originating from the from the server.
214: *
215: * @return the replyTo address
216: */
217: org.apache.cxf.ws.addressing.EndpointReferenceType getReplyTo() {
218: return replyTo;
219: }
220:
221: void initialise(Conduit c,
222: org.apache.cxf.ws.addressing.EndpointReferenceType r) {
223: conduit = c;
224: replyTo = r;
225: createService();
226: createEndpoint();
227: setPolicies();
228: }
229:
230: void createService() {
231: ServiceInfo si = new ServiceInfo();
232: si.setName(SERVICE_NAME);
233: buildInterfaceInfo(si);
234:
235: service = new WrappedService(applicationEndpoint.getService(),
236: SERVICE_NAME, si);
237:
238: DataBinding dataBinding = null;
239: try {
240: dataBinding = new JAXBDataBinding(CreateSequenceType.class,
241: CreateSequenceResponseType.class,
242: TerminateSequenceType.class,
243: SequenceFaultType.class);
244: } catch (JAXBException e) {
245: throw new ServiceConstructionException(e);
246: }
247: service.setDataBinding(dataBinding);
248: service.setInvoker(servant);
249: }
250:
251: void createEndpoint() {
252: ServiceInfo si = service.getServiceInfo();
253: buildBindingInfo(si);
254: EndpointInfo aei = applicationEndpoint.getEndpointInfo();
255: String transportId = aei.getTransportId();
256: EndpointInfo ei = new EndpointInfo(si, transportId);
257:
258: ei.setAddress(aei.getAddress());
259:
260: ei.setName(RMConstants.getPortName());
261: ei.setBinding(si.getBinding(BINDING_NAME));
262:
263: // if addressing was enabled on the application endpoint by means
264: // of the UsingAddressing element extensor, use this for the
265: // RM endpoint also
266:
267: Object ua = getUsingAddressing(aei);
268: if (null != ua) {
269: ei.addExtensor(ua);
270: }
271: si.addEndpoint(ei);
272:
273: endpoint = new WrappedEndpoint(applicationEndpoint, ei, service);
274: service.setEndpoint(endpoint);
275: }
276:
277: void setPolicies() {
278: // use same WS-policies as for application endpoint
279: PolicyEngine engine = manager.getBus().getExtension(
280: PolicyEngine.class);
281: if (null == engine || !engine.isEnabled()) {
282: return;
283: }
284:
285: EndpointInfo ei = getEndpoint().getEndpointInfo();
286:
287: PolicyInterceptorProviderRegistry reg = manager.getBus()
288: .getExtension(PolicyInterceptorProviderRegistry.class);
289: EndpointPolicy ep = null == conduit ? engine
290: .getServerEndpointPolicy(applicationEndpoint
291: .getEndpointInfo(), null) : engine
292: .getClientEndpointPolicy(applicationEndpoint
293: .getEndpointInfo(), conduit);
294:
295: engine.setEndpointPolicy(ei, ep);
296:
297: EffectivePolicy effectiveOutbound = new EffectivePolicyImpl(ep,
298: reg, true, false);
299: EffectivePolicy effectiveInbound = new EffectivePolicyImpl(ep,
300: reg, false, false);
301:
302: BindingInfo bi = ei.getBinding();
303: Collection<BindingOperationInfo> bois = bi.getOperations();
304:
305: for (BindingOperationInfo boi : bois) {
306: engine.setEffectiveServerRequestPolicy(ei, boi,
307: effectiveInbound);
308: engine.setEffectiveServerResponsePolicy(ei, boi,
309: effectiveOutbound);
310:
311: engine.setEffectiveClientRequestPolicy(ei, boi,
312: effectiveOutbound);
313: engine.setEffectiveClientResponsePolicy(ei, boi,
314: effectiveInbound);
315: }
316:
317: // TODO: FaultPolicy (SequenceFault)
318: }
319:
320: void buildInterfaceInfo(ServiceInfo si) {
321: InterfaceInfo ii = new InterfaceInfo(si, INTERFACE_NAME);
322: buildOperationInfo(ii);
323: }
324:
325: void buildOperationInfo(InterfaceInfo ii) {
326: buildCreateSequenceOperationInfo(ii);
327: buildTerminateSequenceOperationInfo(ii);
328: buildSequenceAckOperationInfo(ii);
329: buildLastMessageOperationInfo(ii);
330: buildAckRequestedOperationInfo(ii);
331:
332: // TODO: FaultInfo (SequenceFault)
333: }
334:
335: void buildCreateSequenceOperationInfo(InterfaceInfo ii) {
336:
337: OperationInfo operationInfo = null;
338: MessagePartInfo partInfo = null;
339: MessageInfo messageInfo = null;
340:
341: operationInfo = ii.addOperation(RMConstants
342: .getCreateSequenceOperationName());
343: messageInfo = operationInfo.createMessage(RMConstants
344: .getCreateSequenceOperationName());
345: operationInfo.setInput(messageInfo.getName().getLocalPart(),
346: messageInfo);
347: partInfo = messageInfo.addMessagePart(CREATE_PART_NAME);
348: partInfo.setElementQName(RMConstants
349: .getCreateSequenceOperationName());
350: partInfo.setElement(true);
351: partInfo.setTypeClass(CreateSequenceType.class);
352:
353: messageInfo = operationInfo.createMessage(RMConstants
354: .getCreateSequenceResponseOperationName());
355: operationInfo.setOutput(messageInfo.getName().getLocalPart(),
356: messageInfo);
357: partInfo = messageInfo
358: .addMessagePart(CREATE_RESPONSE_PART_NAME);
359: partInfo.setElementQName(RMConstants
360: .getCreateSequenceResponseOperationName());
361: partInfo.setElement(true);
362: partInfo.setTypeClass(CreateSequenceResponseType.class);
363: partInfo.setIndex(0);
364:
365: operationInfo = ii.addOperation(RMConstants
366: .getCreateSequenceOnewayOperationName());
367: messageInfo = operationInfo.createMessage(RMConstants
368: .getCreateSequenceOperationName());
369: operationInfo.setInput(messageInfo.getName().getLocalPart(),
370: messageInfo);
371: partInfo = messageInfo.addMessagePart(CREATE_PART_NAME);
372: partInfo.setElementQName(RMConstants
373: .getCreateSequenceOperationName());
374: partInfo.setElement(true);
375: partInfo.setTypeClass(CreateSequenceType.class);
376:
377: operationInfo = ii.addOperation(RMConstants
378: .getCreateSequenceResponseOnewayOperationName());
379: messageInfo = operationInfo.createMessage(RMConstants
380: .getCreateSequenceResponseOperationName());
381: operationInfo.setInput(messageInfo.getName().getLocalPart(),
382: messageInfo);
383: partInfo = messageInfo
384: .addMessagePart(CREATE_RESPONSE_PART_NAME);
385: partInfo.setElementQName(RMConstants
386: .getCreateSequenceResponseOperationName());
387: partInfo.setElement(true);
388: partInfo.setTypeClass(CreateSequenceResponseType.class);
389: }
390:
391: void buildTerminateSequenceOperationInfo(InterfaceInfo ii) {
392:
393: OperationInfo operationInfo = null;
394: MessagePartInfo partInfo = null;
395: MessageInfo messageInfo = null;
396:
397: operationInfo = ii.addOperation(RMConstants
398: .getTerminateSequenceOperationName());
399: messageInfo = operationInfo.createMessage(RMConstants
400: .getTerminateSequenceOperationName());
401: operationInfo.setInput(messageInfo.getName().getLocalPart(),
402: messageInfo);
403: partInfo = messageInfo.addMessagePart(TERMINATE_PART_NAME);
404: partInfo.setElementQName(RMConstants
405: .getTerminateSequenceOperationName());
406: partInfo.setElement(true);
407: partInfo.setTypeClass(TerminateSequenceType.class);
408: }
409:
410: void buildSequenceAckOperationInfo(InterfaceInfo ii) {
411:
412: OperationInfo operationInfo = null;
413: MessageInfo messageInfo = null;
414:
415: operationInfo = ii.addOperation(RMConstants
416: .getSequenceAckOperationName());
417: messageInfo = operationInfo.createMessage(RMConstants
418: .getSequenceAckOperationName());
419: operationInfo.setInput(messageInfo.getName().getLocalPart(),
420: messageInfo);
421: }
422:
423: void buildLastMessageOperationInfo(InterfaceInfo ii) {
424:
425: OperationInfo operationInfo = null;
426: MessageInfo messageInfo = null;
427:
428: operationInfo = ii.addOperation(RMConstants
429: .getLastMessageOperationName());
430: messageInfo = operationInfo.createMessage(RMConstants
431: .getLastMessageOperationName());
432: operationInfo.setInput(messageInfo.getName().getLocalPart(),
433: messageInfo);
434: }
435:
436: void buildAckRequestedOperationInfo(InterfaceInfo ii) {
437:
438: OperationInfo operationInfo = null;
439: MessageInfo messageInfo = null;
440:
441: operationInfo = ii.addOperation(RMConstants
442: .getAckRequestedOperationName());
443: messageInfo = operationInfo.createMessage(RMConstants
444: .getAckRequestedOperationName());
445: operationInfo.setInput(messageInfo.getName().getLocalPart(),
446: messageInfo);
447: }
448:
449: void buildBindingInfo(ServiceInfo si) {
450: // use same binding id as for application endpoint
451: if (null != applicationEndpoint) {
452: String bindingId = applicationEndpoint.getEndpointInfo()
453: .getBinding().getBindingId();
454: SoapBindingInfo bi = new SoapBindingInfo(si, bindingId);
455: bi.setName(BINDING_NAME);
456: BindingOperationInfo boi = null;
457: SoapOperationInfo soi = null;
458:
459: boi = bi.buildOperation(RMConstants
460: .getCreateSequenceOperationName(), RMConstants
461: .getCreateSequenceOperationName().getLocalPart(),
462: null);
463: soi = new SoapOperationInfo();
464: soi.setAction(RMConstants.getCreateSequenceAction());
465: boi.addExtensor(soi);
466: bi.addOperation(boi);
467:
468: boi = bi.buildOperation(RMConstants
469: .getTerminateSequenceOperationName(),
470: RMConstants.getTerminateSequenceOperationName()
471: .getLocalPart(), null);
472: soi = new SoapOperationInfo();
473: soi.setAction(RMConstants.getTerminateSequenceAction());
474: boi.addExtensor(soi);
475: bi.addOperation(boi);
476:
477: boi = bi.buildOperation(RMConstants
478: .getSequenceAckOperationName(), null, null);
479: assert null != boi;
480: soi = new SoapOperationInfo();
481: soi.setAction(RMConstants.getSequenceAckAction());
482: boi.addExtensor(soi);
483: bi.addOperation(boi);
484:
485: boi = bi.buildOperation(RMConstants
486: .getLastMessageOperationName(), null, null);
487: assert null != boi;
488: soi = new SoapOperationInfo();
489: soi.setAction(RMConstants.getLastMessageAction());
490: boi.addExtensor(soi);
491: bi.addOperation(boi);
492:
493: boi = bi.buildOperation(RMConstants
494: .getAckRequestedOperationName(), null, null);
495: assert null != boi;
496: soi = new SoapOperationInfo();
497: soi.setAction(RMConstants.getAckRequestedAction());
498: boi.addExtensor(soi);
499: bi.addOperation(boi);
500:
501: boi = bi.buildOperation(RMConstants
502: .getCreateSequenceOnewayOperationName(),
503: RMConstants.getCreateSequenceOperationName()
504: .getLocalPart(), null);
505: soi = new SoapOperationInfo();
506: soi.setAction(RMConstants.getCreateSequenceAction());
507: boi.addExtensor(soi);
508: bi.addOperation(boi);
509:
510: boi = bi.buildOperation(RMConstants
511: .getCreateSequenceResponseOnewayOperationName(),
512: RMConstants
513: .getCreateSequenceResponseOperationName()
514: .getLocalPart(), null);
515: soi = new SoapOperationInfo();
516: soi
517: .setAction(RMConstants
518: .getCreateSequenceResponseAction());
519: boi.addExtensor(soi);
520: bi.addOperation(boi);
521:
522: si.addBinding(bi);
523: }
524:
525: // TODO: BindingFaultInfo (SequenceFault)
526: }
527:
528: Object getUsingAddressing(EndpointInfo endpointInfo) {
529: if (null == endpointInfo) {
530: return null;
531: }
532: Object ua = null;
533: List<ExtensibilityElement> exts = endpointInfo
534: .getExtensors(ExtensibilityElement.class);
535: ua = getUsingAddressing(exts);
536: if (null != ua) {
537: return ua;
538: }
539: exts = endpointInfo.getBinding() != null ? endpointInfo
540: .getBinding().getExtensors(ExtensibilityElement.class)
541: : null;
542: ua = getUsingAddressing(exts);
543: if (null != ua) {
544: return ua;
545: }
546: exts = endpointInfo.getService() != null ? endpointInfo
547: .getService().getExtensors(ExtensibilityElement.class)
548: : null;
549: ua = getUsingAddressing(exts);
550: if (null != ua) {
551: return ua;
552: }
553: return ua;
554: }
555:
556: Object getUsingAddressing(List<ExtensibilityElement> exts) {
557: Object ua = null;
558: if (exts != null) {
559: for (ExtensibilityElement ext : exts) {
560: if (Names.WSAW_USING_ADDRESSING_QNAME.equals(ext
561: .getElementType())) {
562: ua = ext;
563: }
564: }
565: }
566: return ua;
567: }
568:
569: void setAplicationEndpoint(Endpoint ae) {
570: applicationEndpoint = ae;
571: }
572:
573: void setManager(RMManager m) {
574: manager = m;
575: }
576:
577: void shutdown() {
578: // cancel outstanding timer tasks (deferred acknowledgements)
579: // and scheduled termination for all
580: // destination sequences of this endpoint
581:
582: for (DestinationSequence ds : getDestination()
583: .getAllSequences()) {
584: ds.cancelDeferredAcknowledgments();
585: ds.cancelTermination();
586: }
587:
588: // try terminating sequences
589: SourcePolicyType sp = manager.getSourcePolicy();
590: SequenceTerminationPolicyType stp = null;
591: if (null != sp) {
592: stp = sp.getSequenceTerminationPolicy();
593: }
594: if (null != stp && stp.isTerminateOnShutdown()) {
595:
596: Collection<SourceSequence> seqs = source
597: .getAllUnacknowledgedSequences();
598: LOG.log(Level.FINE, "Trying to terminate {0} sequences",
599: seqs.size());
600: for (SourceSequence seq : seqs) {
601: try {
602: // destination MUST respond with a
603: // sequence acknowledgement
604: if (seq.isLastMessage()) {
605: // REVISIT: this may be non-standard
606: // getProxy().ackRequested(seq);
607: } else {
608:
609: getProxy().lastMessage(seq);
610: }
611: } catch (RMException ex) {
612: // already logged
613: }
614: }
615: }
616:
617: // cancel outstanding resends for all source sequences
618: // of this endpoint
619:
620: for (SourceSequence ss : getSource().getAllSequences()) {
621: manager.getRetransmissionQueue().stop(ss);
622: }
623: }
624:
625: class EffectivePolicyImpl implements EffectivePolicy {
626:
627: private EndpointPolicy endpointPolicy;
628: private List<Interceptor> interceptors;
629:
630: EffectivePolicyImpl(EndpointPolicy ep,
631: PolicyInterceptorProviderRegistry reg,
632: boolean outbound, boolean fault) {
633: endpointPolicy = ep;
634: interceptors = reg.getInterceptors(endpointPolicy
635: .getChosenAlternative(), outbound, fault);
636: }
637:
638: public Collection<Assertion> getChosenAlternative() {
639: return endpointPolicy.getChosenAlternative();
640: }
641:
642: public List<Interceptor> getInterceptors() {
643: return interceptors;
644: }
645:
646: public Policy getPolicy() {
647: return endpointPolicy.getPolicy();
648: }
649: }
650: }
|