001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.async.impl;
006:
007: import com.tc.async.api.AddPredicate;
008: import com.tc.async.api.EventContext;
009: import com.tc.async.api.OrderedEventContext;
010: import com.tc.async.api.Sink;
011: import com.tc.logging.TCLogger;
012: import com.tc.stats.Stats;
013:
014: import java.util.Collection;
015: import java.util.Comparator;
016: import java.util.Iterator;
017: import java.util.List;
018: import java.util.SortedSet;
019: import java.util.TreeSet;
020:
021: /**
022: * This class provides an order to the events processed. If events are added out of order, this class orderes them
023: * before adding it to the destination sink. If Messages went missing, then this class waits till the missing message
024: * arrives before pushing the events to the destination sink.
025: */
026: public class OrderedSink implements Sink {
027:
028: private final Sink sink;
029: private final TCLogger logger;
030:
031: private long current = 0;
032: private SortedSet pending = new TreeSet(new Comparator() {
033: public int compare(Object o1, Object o2) {
034: long s1 = ((OrderedEventContext) o1).getSequenceID();
035: long s2 = ((OrderedEventContext) o2).getSequenceID();
036: if (s1 < s2)
037: return -1;
038: else if (s1 == s2)
039: return 0;
040: else
041: return 1;
042: }
043: });
044: private AddPredicate predicate;
045:
046: public OrderedSink(TCLogger logger, Sink sink) {
047: this .logger = logger;
048: this .sink = sink;
049: this .predicate = DefaultAddPredicate.getInstance();
050: }
051:
052: public synchronized void add(EventContext context) {
053: if (!predicate.accept(context)) {
054: logger
055: .warn("Predicate forced to ignore message "
056: + context);
057: return;
058: }
059: OrderedEventContext oc = (OrderedEventContext) context;
060: long seq = oc.getSequenceID();
061: if (seq <= current) {
062: throw new AssertionError(
063: "Received Event with a sequence less than the current sequence. Current = "
064: + current + " Seq Id = " + seq
065: + " Event = " + oc);
066: } else if (seq == current + 1) {
067: current = seq;
068: sink.add(context);
069: processPendingIfNecessary();
070: } else {
071: pending.add(oc);
072: if (pending.size() % 10 == 0) {
073: logger
074: .warn(pending.size()
075: + " messages in pending queue. Message with ID "
076: + (current + 1) + " is missing still");
077: }
078: }
079: }
080:
081: private void processPendingIfNecessary() {
082: if (!pending.isEmpty()) {
083: for (Iterator i = pending.iterator(); i.hasNext();) {
084: OrderedEventContext oc = (OrderedEventContext) i.next();
085: long seq = oc.getSequenceID();
086: if (seq == current + 1) {
087: current = seq;
088: sink.add(oc);
089: i.remove();
090: } else {
091: break;
092: }
093: }
094: }
095: }
096:
097: /**
098: * this implementation isnt really lossy.
099: */
100: public boolean addLossy(EventContext context) {
101: add(context);
102: return true;
103: }
104:
105: public void addMany(Collection contexts) {
106: for (Iterator i = contexts.iterator(); i.hasNext();) {
107: EventContext ec = (EventContext) i.next();
108: add(ec);
109: }
110: }
111:
112: public synchronized void clear() {
113: pending.clear();
114: current = 0;
115: sink.clear();
116: }
117:
118: public synchronized AddPredicate getPredicate() {
119: return predicate;
120: }
121:
122: public void pause(List pauseEvents) {
123: clear();
124: sink.pause(pauseEvents);
125: }
126:
127: public synchronized void setAddPredicate(AddPredicate ap) {
128: this .predicate = ap;
129: }
130:
131: public int size() {
132: return sink.size();
133: }
134:
135: public void unpause() {
136: sink.unpause();
137: }
138:
139: public void enableStatsCollection(boolean enable) {
140: sink.enableStatsCollection(enable);
141: }
142:
143: public Stats getStats(long frequency) {
144: return sink.getStats(frequency);
145: }
146:
147: public Stats getStatsAndReset(long frequency) {
148: return sink.getStatsAndReset(frequency);
149: }
150:
151: public boolean isStatsCollectionEnabled() {
152: return sink.isStatsCollectionEnabled();
153: }
154:
155: public void resetStats() {
156: sink.resetStats();
157: }
158: }
|