001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.ws.rm;
019:
020: import java.math.BigInteger;
021: import java.util.Collection;
022: import java.util.HashMap;
023: import java.util.Map;
024: import java.util.Timer;
025: import java.util.logging.Level;
026: import java.util.logging.Logger;
027:
028: import javax.annotation.PostConstruct;
029: import javax.annotation.PreDestroy;
030: import javax.annotation.Resource;
031: import javax.xml.namespace.QName;
032:
033: import org.apache.cxf.Bus;
034: import org.apache.cxf.binding.Binding;
035: import org.apache.cxf.common.logging.LogUtils;
036: import org.apache.cxf.endpoint.Client;
037: import org.apache.cxf.endpoint.ClientLifeCycleListener;
038: import org.apache.cxf.endpoint.ClientLifeCycleManager;
039: import org.apache.cxf.endpoint.Endpoint;
040: import org.apache.cxf.endpoint.Server;
041: import org.apache.cxf.endpoint.ServerLifeCycleListener;
042: import org.apache.cxf.endpoint.ServerLifeCycleManager;
043: import org.apache.cxf.message.Exchange;
044: import org.apache.cxf.message.ExchangeImpl;
045: import org.apache.cxf.message.Message;
046: import org.apache.cxf.message.MessageImpl;
047: import org.apache.cxf.service.Service;
048: import org.apache.cxf.service.model.BindingInfo;
049: import org.apache.cxf.service.model.InterfaceInfo;
050: import org.apache.cxf.service.model.ServiceInfo;
051: import org.apache.cxf.transport.Conduit;
052: import org.apache.cxf.ws.addressing.AddressingProperties;
053: import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
054: import org.apache.cxf.ws.addressing.RelatesToType;
055: import org.apache.cxf.ws.addressing.VersionTransformer;
056: import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
057: import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
058: import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
059: import org.apache.cxf.ws.rm.manager.SourcePolicyType;
060: import org.apache.cxf.ws.rm.persistence.RMMessage;
061: import org.apache.cxf.ws.rm.persistence.RMStore;
062: import org.apache.cxf.ws.rm.policy.RMAssertion;
063: import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
064: import org.apache.cxf.ws.rm.soap.RetransmissionQueueImpl;
065: import org.apache.cxf.ws.rm.soap.SoapFaultFactory;
066:
067: /**
068: *
069: */
070: public class RMManager implements ServerLifeCycleListener,
071: ClientLifeCycleListener {
072:
073: private static final Logger LOG = LogUtils
074: .getL7dLogger(RMManager.class);
075:
076: private Bus bus;
077: private RMStore store;
078: private SequenceIdentifierGenerator idGenerator;
079: private RetransmissionQueue retransmissionQueue;
080: private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
081: private Timer timer = new Timer(true);
082: private RMAssertion rmAssertion;
083: private DeliveryAssuranceType deliveryAssurance;
084: private SourcePolicyType sourcePolicy;
085: private DestinationPolicyType destinationPolicy;
086:
087: // ServerLifeCycleListener
088:
089: public void startServer(Server server) {
090: recoverReliableEndpoint(server.getEndpoint(), null);
091: }
092:
093: public void stopServer(Server server) {
094: shutdownReliableEndpoint(server.getEndpoint());
095: }
096:
097: // ClientLifeCycleListener
098:
099: public void clientCreated(Client client) {
100: recoverReliableEndpoint(client.getEndpoint(), client
101: .getConduit());
102: }
103:
104: public void clientDestroyed(Client client) {
105: shutdownReliableEndpoint(client.getEndpoint());
106: }
107:
108: // Configuration
109:
110: public Bus getBus() {
111: return bus;
112: }
113:
114: @Resource
115: public void setBus(Bus b) {
116: bus = b;
117: }
118:
119: @PostConstruct
120: public void register() {
121: if (null != bus) {
122: bus.setExtension(this , RMManager.class);
123: }
124: }
125:
126: public RMStore getStore() {
127: return store;
128: }
129:
130: public void setStore(RMStore s) {
131: store = s;
132: }
133:
134: public RetransmissionQueue getRetransmissionQueue() {
135: return retransmissionQueue;
136: }
137:
138: public void setRetransmissionQueue(RetransmissionQueue rq) {
139: retransmissionQueue = rq;
140: }
141:
142: public SequenceIdentifierGenerator getIdGenerator() {
143: return idGenerator;
144: }
145:
146: public void setIdGenerator(SequenceIdentifierGenerator generator) {
147: idGenerator = generator;
148: }
149:
150: public Timer getTimer() {
151: return timer;
152: }
153:
154: public BindingFaultFactory getBindingFaultFactory(Binding binding) {
155: return new SoapFaultFactory(binding);
156: }
157:
158: /**
159: * @return Returns the deliveryAssurance.
160: */
161: public DeliveryAssuranceType getDeliveryAssurance() {
162: return deliveryAssurance;
163: }
164:
165: /**
166: * @param deliveryAssurance The deliveryAssurance to set.
167: */
168: public void setDeliveryAssurance(
169: DeliveryAssuranceType deliveryAssurance) {
170: this .deliveryAssurance = deliveryAssurance;
171: }
172:
173: /**
174: * @return Returns the destinationPolicy.
175: */
176: public DestinationPolicyType getDestinationPolicy() {
177: return destinationPolicy;
178: }
179:
180: /**
181: * @param destinationPolicy The destinationPolicy to set.
182: */
183: public void setDestinationPolicy(
184: DestinationPolicyType destinationPolicy) {
185: this .destinationPolicy = destinationPolicy;
186: }
187:
188: /**
189: * @return Returns the rmAssertion.
190: */
191: public RMAssertion getRMAssertion() {
192: return rmAssertion;
193: }
194:
195: /**
196: * @param rma The rmAssertion to set.
197: */
198: public void setRMAssertion(RMAssertion rma) {
199: org.apache.cxf.ws.rm.policy.ObjectFactory factory = new org.apache.cxf.ws.rm.policy.ObjectFactory();
200: if (null == rma) {
201: rma = factory.createRMAssertion();
202: rma.setExponentialBackoff(factory
203: .createRMAssertionExponentialBackoff());
204: }
205: BaseRetransmissionInterval bri = rma
206: .getBaseRetransmissionInterval();
207: if (null == bri) {
208: bri = factory.createRMAssertionBaseRetransmissionInterval();
209: rma.setBaseRetransmissionInterval(bri);
210: }
211: if (null == bri.getMilliseconds()) {
212: bri
213: .setMilliseconds(new BigInteger(
214: RetransmissionQueue.DEFAULT_BASE_RETRANSMISSION_INTERVAL));
215: }
216:
217: rmAssertion = rma;
218: }
219:
220: /**
221: * @return Returns the sourcePolicy.
222: */
223: public SourcePolicyType getSourcePolicy() {
224: return sourcePolicy;
225: }
226:
227: /**
228: * @param sp The sourcePolicy to set.
229: */
230: public void setSourcePolicy(SourcePolicyType sp) {
231: org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
232: if (null == sp) {
233: sp = factory.createSourcePolicyType();
234: }
235: if (!sp.isSetSequenceTerminationPolicy()) {
236: sp.setSequenceTerminationPolicy(factory
237: .createSequenceTerminationPolicyType());
238: }
239: sourcePolicy = sp;
240: }
241:
242: // The real stuff ...
243:
244: public synchronized RMEndpoint getReliableEndpoint(Message message) {
245: Endpoint endpoint = message.getExchange().get(Endpoint.class);
246: QName name = endpoint.getEndpointInfo().getName();
247: if (LOG.isLoggable(Level.FINE)) {
248: LOG.fine("Getting RMEndpoint for endpoint with info: "
249: + name);
250: }
251: if (name.equals(RMConstants.getPortName())) {
252: WrappedEndpoint wrappedEndpoint = (WrappedEndpoint) endpoint;
253: endpoint = wrappedEndpoint.getWrappedEndpoint();
254: }
255: RMEndpoint rme = reliableEndpoints.get(endpoint);
256: if (null == rme) {
257: rme = createReliableEndpoint(endpoint);
258: org.apache.cxf.transport.Destination destination = message
259: .getExchange().getDestination();
260: org.apache.cxf.ws.addressing.EndpointReferenceType replyTo = null;
261: if (null != destination) {
262: AddressingPropertiesImpl maps = RMContextUtils
263: .retrieveMAPs(message, false, false);
264: replyTo = maps.getReplyTo();
265: }
266: rme.initialise(message.getExchange().getConduit(message),
267: replyTo);
268: reliableEndpoints.put(endpoint, rme);
269: LOG.fine("Created new RMEndpoint.");
270: }
271: return rme;
272: }
273:
274: public Destination getDestination(Message message) {
275: RMEndpoint rme = getReliableEndpoint(message);
276: if (null != rme) {
277: return rme.getDestination();
278: }
279: return null;
280: }
281:
282: public Source getSource(Message message) {
283: RMEndpoint rme = getReliableEndpoint(message);
284: if (null != rme) {
285: return rme.getSource();
286: }
287: return null;
288: }
289:
290: public SourceSequence getSequence(Identifier inSeqId,
291: Message message, AddressingProperties maps)
292: throws SequenceFault, RMException {
293:
294: Source source = getSource(message);
295: SourceSequence seq = source.getCurrent(inSeqId);
296: if (null == seq) {
297: // TODO: better error handling
298: org.apache.cxf.ws.addressing.EndpointReferenceType to = null;
299: boolean isServer = RMContextUtils.isServerSide(message);
300: EndpointReferenceType acksTo = null;
301: RelatesToType relatesTo = null;
302: if (isServer) {
303:
304: AddressingPropertiesImpl inMaps = RMContextUtils
305: .retrieveMAPs(message, false, false);
306: inMaps
307: .exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
308: acksTo = RMUtils.createReference2004(inMaps.getTo()
309: .getValue());
310: to = inMaps.getReplyTo();
311: source.getReliableEndpoint().getServant()
312: .setUnattachedIdentifier(inSeqId);
313: relatesTo = (new org.apache.cxf.ws.addressing.ObjectFactory())
314: .createRelatesToType();
315: Destination destination = getDestination(message);
316: DestinationSequence inSeq = inSeqId == null ? null
317: : destination.getSequence(inSeqId);
318: relatesTo.setValue(inSeq != null ? inSeq
319: .getCorrelationID() : null);
320:
321: } else {
322: to = RMUtils.createReference(maps.getTo().getValue());
323: acksTo = VersionTransformer.convert(maps.getReplyTo());
324: if (RMConstants.getNoneAddress().equals(
325: acksTo.getAddress().getValue())) {
326: org.apache.cxf.transport.Destination dest = message
327: .getExchange().getConduit(message)
328: .getBackChannel();
329: if (null == dest) {
330: acksTo = RMUtils.createAnonymousReference2004();
331: } else {
332: acksTo = VersionTransformer.convert(dest
333: .getAddress());
334: }
335: }
336: }
337:
338: Proxy proxy = source.getReliableEndpoint().getProxy();
339: CreateSequenceResponseType createResponse = proxy
340: .createSequence(acksTo, relatesTo, isServer);
341: if (!isServer) {
342: Servant servant = source.getReliableEndpoint()
343: .getServant();
344: servant.createSequenceResponse(createResponse);
345: }
346:
347: seq = source.awaitCurrent(inSeqId);
348: seq.setTarget(to);
349: }
350:
351: return seq;
352: }
353:
354: @PreDestroy
355: public void shutdown() {
356: // shutdown remaining endpoints
357:
358: LOG
359: .log(
360: Level.FINE,
361: "Shutting down RMManager with {0} remaining endpoints.",
362: reliableEndpoints.size());
363: for (RMEndpoint rme : reliableEndpoints.values()) {
364: rme.shutdown();
365: }
366:
367: // remove references to timer tasks cancelled above to make them
368: // eligible for garbage collection
369: timer.purge();
370: timer.cancel();
371: }
372:
373: synchronized void shutdownReliableEndpoint(Endpoint e) {
374: RMEndpoint rme = reliableEndpoints.get(e);
375: if (null == rme) {
376: // not interested
377: return;
378: }
379:
380: rme.shutdown();
381:
382: // remove references to timer tasks cancelled above to make them
383: // eligible for garbage collection
384: timer.purge();
385:
386: reliableEndpoints.remove(e);
387: }
388:
389: void recoverReliableEndpoint(Endpoint endpoint, Conduit conduit) {
390: if (null == store || null == retransmissionQueue) {
391: return;
392: }
393:
394: String id = RMUtils.getEndpointIdentifier(endpoint);
395:
396: Collection<SourceSequence> sss = store.getSourceSequences(id);
397: if (null == sss || 0 == sss.size()) {
398: return;
399: }
400: LOG.log(Level.FINE, "Number of source sequences: {0}", sss
401: .size());
402:
403: RMEndpoint rme = null;
404:
405: for (SourceSequence ss : sss) {
406:
407: Collection<RMMessage> ms = store.getMessages(ss
408: .getIdentifier(), true);
409: if (null == ms || 0 == ms.size()) {
410: continue;
411: }
412: LOG.log(Level.FINE, "Number of messages in sequence: {0}",
413: ms.size());
414:
415: if (null == rme) {
416: LOG.log(Level.FINE,
417: "Recovering {0} endpoint with id: {1}",
418: new Object[] {
419: null == conduit ? "client" : "server",
420: id });
421: rme = createReliableEndpoint(endpoint);
422: rme.initialise(conduit, null);
423: reliableEndpoints.put(endpoint, rme);
424: }
425: rme.getSource().addSequence(ss, false);
426:
427: for (RMMessage m : ms) {
428:
429: Message message = new MessageImpl();
430: Exchange exchange = new ExchangeImpl();
431: message.setExchange(exchange);
432: if (null != conduit) {
433: exchange.setConduit(conduit);
434: message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
435: }
436: exchange.put(Endpoint.class, endpoint);
437: exchange.put(Service.class, endpoint.getService());
438: if (endpoint.getEndpointInfo().getService() != null) {
439: exchange.put(ServiceInfo.class, endpoint
440: .getEndpointInfo().getService());
441: exchange.put(InterfaceInfo.class, endpoint
442: .getEndpointInfo().getService()
443: .getInterface());
444: }
445: exchange.put(Binding.class, endpoint.getBinding());
446: exchange.put(BindingInfo.class, endpoint
447: .getEndpointInfo().getBinding());
448: exchange.put(Bus.class, bus);
449:
450: SequenceType st = RMUtils.getWSRMFactory()
451: .createSequenceType();
452: st.setIdentifier(ss.getIdentifier());
453: st.setMessageNumber(m.getMessageNumber());
454: if (ss.isLastMessage()
455: && ss.getCurrentMessageNr().equals(
456: m.getMessageNumber())) {
457: st.setLastMessage(RMUtils.getWSRMFactory()
458: .createSequenceTypeLastMessage());
459: }
460: RMProperties rmps = new RMProperties();
461: rmps.setSequence(st);
462: RMContextUtils.storeRMProperties(message, rmps, true);
463: if (null == conduit) {
464: String to = m.getTo();
465: AddressingProperties maps = new AddressingPropertiesImpl();
466: maps.setTo(RMUtils.createReference(to));
467: RMContextUtils
468: .storeMAPs(maps, message, true, false);
469: }
470:
471: message.setContent(byte[].class, m.getContent());
472:
473: retransmissionQueue.addUnacknowledged(message);
474: }
475: }
476: retransmissionQueue.start();
477:
478: }
479:
480: RMEndpoint createReliableEndpoint(Endpoint endpoint) {
481: return new RMEndpoint(this , endpoint);
482: }
483:
484: @PostConstruct
485: void initialise() {
486: if (null == rmAssertion) {
487: setRMAssertion(null);
488: }
489: org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
490: if (null == deliveryAssurance) {
491: DeliveryAssuranceType da = factory
492: .createDeliveryAssuranceType();
493: da.setAtLeastOnce(factory
494: .createDeliveryAssuranceTypeAtLeastOnce());
495: setDeliveryAssurance(da);
496: }
497: if (null == sourcePolicy) {
498: setSourcePolicy(null);
499:
500: }
501: if (null == destinationPolicy) {
502: DestinationPolicyType dp = factory
503: .createDestinationPolicyType();
504: dp.setAcksPolicy(factory.createAcksPolicyType());
505: setDestinationPolicy(dp);
506: }
507: if (null == retransmissionQueue) {
508: retransmissionQueue = new RetransmissionQueueImpl(this );
509: }
510: if (null == idGenerator) {
511: idGenerator = new DefaultSequenceIdentifierGenerator();
512: }
513: }
514:
515: @PostConstruct
516: void registerListeners() {
517: if (null == bus) {
518: return;
519: }
520: ServerLifeCycleManager slm = bus
521: .getExtension(ServerLifeCycleManager.class);
522: if (null != slm) {
523: slm.registerListener(this );
524: }
525: ClientLifeCycleManager clm = bus
526: .getExtension(ClientLifeCycleManager.class);
527: if (null != clm) {
528: clm.registerListener(this );
529: }
530: }
531:
532: Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
533: return reliableEndpoints;
534: }
535:
536: void setReliableEndpointsMap(Map<Endpoint, RMEndpoint> map) {
537: reliableEndpoints = map;
538: }
539:
540: class DefaultSequenceIdentifierGenerator implements
541: SequenceIdentifierGenerator {
542:
543: public Identifier generateSequenceIdentifier() {
544: String sequenceID = RMContextUtils.generateUUID();
545: Identifier sid = RMUtils.getWSRMFactory()
546: .createIdentifier();
547: sid.setValue(sequenceID);
548: return sid;
549: }
550: }
551:
552: }
|