001: package org.objectweb.celtix.bus.ws.rm;
002:
003: import java.io.IOException;
004: import java.util.Collection;
005: import java.util.HashMap;
006: import java.util.Map;
007: import java.util.logging.Level;
008: import java.util.logging.Logger;
009:
010: import org.objectweb.celtix.bus.configuration.wsrm.AcksPolicyType;
011: import org.objectweb.celtix.bus.configuration.wsrm.DestinationPolicyType;
012: import org.objectweb.celtix.common.i18n.Message;
013: import org.objectweb.celtix.common.logging.LogUtils;
014: import org.objectweb.celtix.ws.rm.Identifier;
015: import org.objectweb.celtix.ws.rm.SequenceType;
016: import org.objectweb.celtix.ws.rm.persistence.RMDestinationSequence;
017: import org.objectweb.celtix.ws.rm.persistence.RMStore;
018: import org.objectweb.celtix.ws.rm.wsdl.SequenceFault;
019:
020: public class RMDestination extends RMEndpoint {
021:
022: private static final Logger LOG = LogUtils
023: .getL7dLogger(RMDestination.class);
024: private static final String DESTINATION_POLICIES_PROPERTY_NAME = "destinationPolicies";
025:
026: private Map<String, DestinationSequence> map;
027:
028: RMDestination(RMHandler h) {
029: super (h);
030: map = new HashMap<String, DestinationSequence>();
031: }
032:
033: public DestinationSequence getSequence(Identifier id) {
034: return map.get(id.getValue());
035: }
036:
037: public void addSequence(DestinationSequence seq) {
038: addSequence(seq, true);
039: }
040:
041: public void addSequence(DestinationSequence seq, boolean persist) {
042: seq.setDestination(this );
043: map.put(seq.getIdentifier().getValue(), seq);
044: if (persist) {
045: getHandler().getStore().createDestinationSequence(seq);
046: }
047: }
048:
049: public void removeSequence(DestinationSequence seq) {
050: map.remove(seq.getIdentifier().getValue());
051: getHandler().getStore().removeDestinationSequence(
052: seq.getIdentifier());
053: }
054:
055: public Collection<DestinationSequence> getAllSequences() {
056: return map.values();
057: }
058:
059: public DestinationPolicyType getDestinationPolicies() {
060: DestinationPolicyType dp = getHandler().getConfiguration()
061: .getObject(DestinationPolicyType.class,
062: DESTINATION_POLICIES_PROPERTY_NAME);
063: if (null == dp) {
064: dp = RMUtils.getWSRMConfFactory()
065: .createDestinationPolicyType();
066: }
067: return dp;
068: }
069:
070: public AcksPolicyType getAcksPolicy() {
071: DestinationPolicyType dp = getDestinationPolicies();
072: assert null != dp;
073: AcksPolicyType ap = dp.getAcksPolicy();
074: if (null == ap) {
075: ap = RMUtils.getWSRMConfFactory().createAcksPolicyType();
076: }
077: return ap;
078: }
079:
080: /**
081: * Acknowledges receipt of a message. If the message is the last in the sequence,
082: * sends an out-of-band SequenceAcknowledgement unless there a response will be sent
083: * to the acksTo address onto which the acknowldegment can be piggybacked.
084: *
085: * @param sequenceType the sequenceType object that includes identifier and message number
086: * (and possibly a lastMessage element) for the message to be acknowledged)
087: * @param replyToAddress the replyTo address of the message that carried this sequence information
088: * @throws SequenceFault if the sequence specified in <code>sequenceType</code> does not exist
089: */
090: public void acknowledge(SequenceType sequenceType,
091: String replyToAddress) throws SequenceFault {
092: DestinationSequence seq = getSequence(sequenceType
093: .getIdentifier());
094: if (null != seq) {
095: seq.acknowledge(sequenceType.getMessageNumber());
096:
097: if (null != sequenceType.getLastMessage()) {
098:
099: seq.setLastMessageNumber(sequenceType
100: .getMessageNumber());
101:
102: seq.scheduleImmediateAcknowledgement();
103:
104: // if we cannot expect an outgoing message to which the acknowledgement
105: // can be added we need to send an out-of-band SequenceAcknowledgement message
106:
107: if (!(seq.getAcksTo().getAddress().getValue().equals(
108: replyToAddress) || seq
109: .canPiggybackAckOnPartialResponse())) {
110: try {
111: getHandler().getProxy().acknowledge(seq);
112: } catch (IOException ex) {
113: Message msg = new Message("SEQ_ACK_SEND_EXC",
114: LOG, seq);
115: LOG.log(Level.SEVERE, msg.toString(), ex);
116: }
117: }
118: }
119: } else {
120: throw DestinationSequence
121: .createUnknownSequenceFault(sequenceType
122: .getIdentifier());
123: }
124: }
125:
126: void restore() {
127: RMStore store = getHandler().getStore();
128:
129: Collection<RMDestinationSequence> dss = store
130: .getDestinationSequences(getEndpointId());
131: for (RMDestinationSequence ds : dss) {
132: addSequence((DestinationSequence) ds, false);
133: }
134: }
135: }
|