001: package org.objectweb.celtix.bus.ws.rm;
002:
003: import java.io.IOException;
004: import java.util.ArrayList;
005: import java.util.Collection;
006: import java.util.HashMap;
007: import java.util.Map;
008: import java.util.concurrent.locks.Condition;
009: import java.util.concurrent.locks.Lock;
010: import java.util.concurrent.locks.ReentrantLock;
011: import java.util.logging.Level;
012: import java.util.logging.Logger;
013:
014: import org.objectweb.celtix.Bus;
015: import org.objectweb.celtix.bus.configuration.wsrm.SequenceTerminationPolicyType;
016: import org.objectweb.celtix.bus.configuration.wsrm.SourcePolicyType;
017: import org.objectweb.celtix.buslifecycle.BusLifeCycleListener;
018: import org.objectweb.celtix.common.i18n.Message;
019: import org.objectweb.celtix.common.logging.LogUtils;
020: import org.objectweb.celtix.context.ObjectMessageContext;
021: import org.objectweb.celtix.ws.rm.Identifier;
022: import org.objectweb.celtix.ws.rm.SequenceAcknowledgement;
023: import org.objectweb.celtix.ws.rm.persistence.RMMessage;
024: import org.objectweb.celtix.ws.rm.persistence.RMSourceSequence;
025: import org.objectweb.celtix.ws.rm.persistence.RMStore;
026:
027: public class RMSource extends RMEndpoint {
028:
029: private static final Logger LOG = LogUtils
030: .getL7dLogger(RMSource.class);
031: private static final String SOURCE_POLICIES_PROPERTY_NAME = "sourcePolicies";
032: private static final String REQUESTOR_SEQUENCE_ID = "";
033:
034: private Map<String, SourceSequence> map;
035: private Map<String, SourceSequence> current;
036: private final RetransmissionQueue retransmissionQueue;
037: private Lock sequenceCreationLock;
038: private Condition sequenceCreationCondition;
039: private boolean sequenceCreationNotified;
040:
041: RMSource(RMHandler h) {
042: super (h);
043: map = new HashMap<String, SourceSequence>();
044: Bus bus = h.getBus();
045: bus.getLifeCycleManager().registerLifeCycleListener(
046: new BusLifeCycleListener() {
047: public void initComplete() {
048: }
049:
050: public void postShutdown() {
051: }
052:
053: public void preShutdown() {
054: shutdown();
055: }
056: });
057: current = new HashMap<String, SourceSequence>();
058:
059: retransmissionQueue = new RetransmissionQueue(h,
060: getRMAssertion());
061: sequenceCreationLock = new ReentrantLock();
062: sequenceCreationCondition = sequenceCreationLock.newCondition();
063: }
064:
065: public SourceSequence getSequence(Identifier id) {
066: return map.get(id.getValue());
067: }
068:
069: public void addSequence(SourceSequence seq) {
070: addSequence(seq, true);
071: }
072:
073: public void addSequence(SourceSequence seq, boolean persist) {
074: LOG.fine("Adding source sequence: " + seq);
075: seq.setSource(this );
076: map.put(seq.getIdentifier().getValue(), seq);
077: if (persist) {
078: getHandler().getStore().createSourceSequence(seq);
079: }
080: }
081:
082: public void removeSequence(SourceSequence seq) {
083: map.remove(seq.getIdentifier().getValue());
084: getHandler().getStore().removeSourceSequence(
085: seq.getIdentifier());
086: }
087:
088: public final Collection<SourceSequence> getAllSequences() {
089: return map.values();
090: }
091:
092: public SourcePolicyType getSourcePolicies() {
093: SourcePolicyType sp = (SourcePolicyType) getHandler()
094: .getConfiguration().getObject(SourcePolicyType.class,
095: SOURCE_POLICIES_PROPERTY_NAME);
096: if (null == sp) {
097: sp = RMUtils.getWSRMConfFactory().createSourcePolicyType();
098: }
099: return sp;
100: }
101:
102: public SequenceTerminationPolicyType getSequenceTerminationPolicy() {
103: SourcePolicyType sp = getSourcePolicies();
104: assert null != sp;
105: SequenceTerminationPolicyType stp = sp
106: .getSequenceTerminationPolicy();
107: if (null == stp) {
108: stp = RMUtils.getWSRMConfFactory()
109: .createSequenceTerminationPolicyType();
110: }
111: return stp;
112: }
113:
114: public RetransmissionQueue getRetransmissionQueue() {
115: return retransmissionQueue;
116: }
117:
118: /**
119: * Returns the current sequence used by a client side source.
120: *
121: * @return the current sequence.
122: */
123: SourceSequence getCurrent() {
124: return getCurrent(null);
125: }
126:
127: /**
128: * Sets the current sequence used by a client side source.
129: * @param s the current sequence.
130: */
131: void setCurrent(SourceSequence s) {
132: setCurrent(null, s);
133: }
134:
135: /**
136: * Returns the current sequence used by a server side source for responses to a message
137: * sent as part of the inbound sequence with the specified identifier.
138: *
139: * @return the current sequence.
140: */
141: SourceSequence getCurrent(Identifier i) {
142: sequenceCreationLock.lock();
143: try {
144: return getAssociatedSequence(i);
145: } finally {
146: sequenceCreationLock.unlock();
147: }
148: }
149:
150: /**
151: * Returns the sequence associated with the given identifier.
152: *
153: * @param i the corresponding sequence identifier
154: * @return the associated sequence
155: * @pre the sequenceCreationLock is already held
156: */
157: SourceSequence getAssociatedSequence(Identifier i) {
158: return current.get(i == null ? REQUESTOR_SEQUENCE_ID : i
159: .getValue());
160: }
161:
162: /**
163: * Await the avilability of a sequence corresponding to the given identifier.
164: *
165: * @param i the sequnce identifier
166: * @return
167: */
168: SourceSequence awaitCurrent(Identifier i) {
169: sequenceCreationLock.lock();
170: try {
171: SourceSequence seq = getAssociatedSequence(i);
172: while (seq == null) {
173: while (!sequenceCreationNotified) {
174: try {
175: sequenceCreationCondition.await();
176: } catch (InterruptedException ie) {
177: // ignore
178: }
179: }
180: seq = getAssociatedSequence(i);
181: }
182: return seq;
183: } finally {
184: sequenceCreationLock.unlock();
185: }
186: }
187:
188: /**
189: * Sets the current sequence used by a server side source for responses to a message
190: * sent as part of the inbound sequence with the specified identifier.
191: * @param s the current sequence.
192: */
193: void setCurrent(Identifier i, SourceSequence s) {
194: sequenceCreationLock.lock();
195: try {
196: current.put(i == null ? REQUESTOR_SEQUENCE_ID : i
197: .getValue(), s);
198: sequenceCreationNotified = true;
199: sequenceCreationCondition.signal();
200: } finally {
201: sequenceCreationLock.unlock();
202: }
203: }
204:
205: /**
206: * Create a copy of the message, store it in the retransmission queue and
207: * schedule the next transmission
208: *
209: * @param context
210: */
211: public void addUnacknowledged(SourceSequence seq, RMMessage msg) {
212: ObjectMessageContext clone = getHandler().getBinding()
213: .createObjectContext();
214: clone.putAll(msg.getContext());
215: getRetransmissionQueue().cacheUnacknowledged(clone);
216: getHandler().getStore().persistOutgoing(seq, msg);
217: }
218:
219: /**
220: * Stores the received acknowledgment in the Sequence object identified in
221: * the <code>SequenceAcknowldgement</code> parameter. Then purges any
222: * acknowledged messages from the retransmission queue and requests sequence
223: * termination if necessary.
224: *
225: * @param acknowledgment
226: */
227: public void setAcknowledged(SequenceAcknowledgement acknowledgment) {
228: Identifier sid = acknowledgment.getIdentifier();
229: SourceSequence seq = getSequence(sid);
230: if (null != seq) {
231: seq.setAcknowledged(acknowledgment);
232: retransmissionQueue.purgeAcknowledged(seq);
233: if (seq.allAcknowledged()) {
234: try {
235: getHandler().getProxy().terminateSequence(seq);
236: } catch (IOException ex) {
237: Message msg = new Message(
238: "SEQ_TERMINATION_FAILURE", LOG, seq
239: .getIdentifier());
240: LOG.log(Level.SEVERE, msg.toString(), ex);
241: }
242: }
243: }
244: }
245:
246: public void shutdown() {
247: retransmissionQueue.shutdown();
248: }
249:
250: /**
251: * Returns a collection of all sequences for which have not yet been
252: * completely acknowledged.
253: *
254: * @return the collection of unacknowledged sequences.
255: */
256: public Collection<SourceSequence> getAllUnacknowledgedSequences() {
257: Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
258: for (SourceSequence seq : map.values()) {
259: if (!seq.allAcknowledged()) {
260: seqs.add(seq);
261: }
262: }
263: return seqs;
264: }
265:
266: void restore() {
267: RMStore store = getHandler().getStore();
268:
269: Collection<RMSourceSequence> dss = store
270: .getSourceSequences(getEndpointId());
271: // Don't make any of these sequences the current sequence, thus forcing
272: // termination of the recovered sequences as soon as possible
273: for (RMSourceSequence ds : dss) {
274: addSequence((SourceSequence) ds, false);
275: }
276:
277: retransmissionQueue.populate(getAllSequences());
278: int n = retransmissionQueue.getUnacknowledged().size();
279: if (n > 0) {
280: LOG.fine("Recovered " + n
281: + " messages, start retransmission queue now");
282: retransmissionQueue.start(getHandler().getBus()
283: .getWorkQueueManager().getAutomaticWorkQueue());
284: } else {
285: LOG.fine("No outgoing messages recovered");
286: }
287:
288: }
289: }
|