001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.async.impl;
006:
007: import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
008: import EDU.oswego.cs.dl.util.concurrent.Channel;
009: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
010:
011: import com.tc.async.api.AddPredicate;
012: import com.tc.async.api.EventContext;
013: import com.tc.async.api.Sink;
014: import com.tc.async.api.Source;
015: import com.tc.exception.TCRuntimeException;
016: import com.tc.logging.TCLogger;
017: import com.tc.logging.TCLoggerProvider;
018: import com.tc.stats.Stats;
019: import com.tc.util.Assert;
020: import com.tc.util.State;
021:
022: import java.util.Collection;
023: import java.util.Iterator;
024: import java.util.LinkedList;
025: import java.util.List;
026:
027: /**
028: * The beginnings of an implementation of our SEDA like framework. This Is part of an impl for the queue
029: *
030: * @author steve
031: */
032: public class StageQueueImpl implements Sink, Source {
033:
034: private static final State RUNNING = new State("RUNNING");
035: private static final State PAUSED = new State("PAUSED");
036:
037: private final Channel queue;
038: private final String stage;
039: private final TCLogger logger;
040: private AddPredicate predicate = DefaultAddPredicate.getInstance();
041: private volatile State state = RUNNING;
042: private volatile StageQueueStatsCollector statsCollector;
043:
044: public StageQueueImpl(TCLoggerProvider loggerProvider,
045: String stage, Channel queue) {
046: this .queue = queue;
047: this .logger = loggerProvider.getLogger(Sink.class.getName()
048: + ": " + stage);
049: this .stage = stage;
050: this .statsCollector = new NullStageQueueStatsCollector(stage);
051: }
052:
053: /**
054: * The context will be added if the sink was found to be empty(at somepoint during the call). If the queue was not
055: * empty (at somepoint during the call) the context might not be added. This method should only be used where the
056: * stage threads are to be signaled on data availablity and the threads take care of getting data from elsewhere
057: */
058: public boolean addLossy(EventContext context) {
059: if (isEmpty()) {
060: add(context);
061: return true;
062: } else {
063: return false;
064: }
065: }
066:
067: // XXX::Ugly hack since this method doesnt exist on the Channel interface
068: private boolean isEmpty() {
069: if (queue instanceof BoundedLinkedQueue) {
070: return ((BoundedLinkedQueue) queue).isEmpty();
071: } else if (queue instanceof LinkedQueue) {
072: return ((LinkedQueue) queue).isEmpty();
073: } else {
074: throw new AssertionError("Unsupported channel "
075: + queue.getClass().getName() + " in "
076: + getClass().getName());
077: }
078: }
079:
080: public void addMany(Collection contexts) {
081: if (logger.isDebugEnabled())
082: logger.debug("Added many:" + contexts + " to:" + stage);
083: for (Iterator i = contexts.iterator(); i.hasNext();) {
084: add((EventContext) i.next());
085: }
086: }
087:
088: public void add(EventContext context) {
089: Assert.assertNotNull(context);
090: if (state == PAUSED) {
091: logger.info("Ignoring event while PAUSED: " + context);
092: return;
093: }
094:
095: if (logger.isDebugEnabled())
096: logger.debug("Added:" + context + " to:" + stage);
097: if (!predicate.accept(context)) {
098: if (logger.isDebugEnabled())
099: logger.debug("Predicate caused skip add for:" + context
100: + " to:" + stage);
101: return;
102: }
103:
104: statsCollector.contextAdded();
105: try {
106: queue.put(context);
107: } catch (InterruptedException e) {
108: // TODO Auto-generated catch block
109: e.printStackTrace();
110: }
111:
112: }
113:
114: public EventContext get() throws InterruptedException {
115: return poll(Long.MAX_VALUE);
116: }
117:
118: public EventContext poll(long period) throws InterruptedException {
119: EventContext rv = (EventContext) queue.poll(period);
120: if (rv != null)
121: statsCollector.contextRemoved();
122: return rv;
123: }
124:
125: // Used for testing
126: public int size() {
127: if (queue instanceof BoundedLinkedQueue) {
128: return ((BoundedLinkedQueue) queue).size();
129: } else {
130: return 0;
131: }
132: }
133:
134: public Collection getAll() throws InterruptedException {
135: List l = new LinkedList();
136: l.add(queue.take());
137: while (true) {
138: Object o = queue.poll(0);
139: if (o == null) {
140: // could be a little off
141: statsCollector.reset();
142: break;
143: } else {
144: l.add(o);
145: }
146: }
147: return l;
148: }
149:
150: public void setAddPredicate(AddPredicate predicate) {
151: Assert.eval(predicate != null);
152: this .predicate = predicate;
153: }
154:
155: public AddPredicate getPredicate() {
156: return predicate;
157: }
158:
159: public String toString() {
160: return "StageQueue(" + stage + ")";
161: }
162:
163: public void clear() {
164: try {
165: // XXX: poor man's clear.
166: int clearCount = 0;
167: while (poll(0) != null) { // calling this.poll() to get counter updated
168: /* supress no-body warning */
169: clearCount++;
170: }
171: statsCollector.reset();
172: logger.info("Cleared " + clearCount);
173: } catch (InterruptedException e) {
174: throw new TCRuntimeException(e);
175: }
176: }
177:
178: public void pause(List pauseEvents) {
179: if (state != RUNNING)
180: throw new AssertionError(
181: "Attempt to pause while not running: " + state);
182: state = PAUSED;
183: clear();
184: for (Iterator i = pauseEvents.iterator(); i.hasNext();) {
185: try {
186: queue.put(i.next());
187: statsCollector.contextAdded();
188: } catch (InterruptedException e) {
189: throw new TCRuntimeException(e);
190: }
191: }
192: }
193:
194: public void unpause() {
195: if (state != PAUSED)
196: throw new AssertionError(
197: "Attempt to unpause while not paused: " + state);
198: state = RUNNING;
199: }
200:
201: /*********************************************************************************************************************
202: * Monitorable Interface
203: */
204:
205: public void enableStatsCollection(boolean enable) {
206: if (enable) {
207: statsCollector = new StageQueueStatsCollectorImpl(stage);
208: } else {
209: statsCollector = new NullStageQueueStatsCollector(stage);
210: }
211: }
212:
213: public Stats getStats(long frequency) {
214: return statsCollector;
215: }
216:
217: public Stats getStatsAndReset(long frequency) {
218: return getStats(frequency);
219: }
220:
221: public boolean isStatsCollectionEnabled() {
222: return statsCollector instanceof StageQueueStatsCollectorImpl;
223: }
224:
225: public void resetStats() {
226: // NOP
227: }
228:
229: public static abstract class StageQueueStatsCollector implements
230: Stats {
231:
232: public void logDetails(TCLogger statsLogger) {
233: statsLogger.info(getDetails());
234: }
235:
236: public abstract void contextAdded();
237:
238: public abstract void reset();
239:
240: public abstract void contextRemoved();
241:
242: protected String makeWidth(String name, int width) {
243: final int len = name.length();
244: if (len == width) {
245: return name;
246: }
247: if (len > width) {
248: return name.substring(0, width);
249: }
250:
251: StringBuffer buf = new StringBuffer(name);
252: for (int i = len; i < width; i++) {
253: buf.append(' ');
254: }
255: return buf.toString();
256: }
257: }
258:
259: public static class NullStageQueueStatsCollector extends
260: StageQueueStatsCollector {
261:
262: private String name;
263:
264: public NullStageQueueStatsCollector(String stage) {
265: this .name = makeWidth(stage, 40);
266: }
267:
268: public String getDetails() {
269: return name + " : Not Monitored";
270: }
271:
272: public void contextAdded() {
273: // NOOP
274: }
275:
276: public void contextRemoved() {
277: // NOOP
278: }
279:
280: public void reset() {
281: // NOOP
282: }
283: }
284:
285: public static class StageQueueStatsCollectorImpl extends
286: StageQueueStatsCollector {
287:
288: private int count = 0;
289: private String name;
290:
291: public StageQueueStatsCollectorImpl(String stage) {
292: this .name = makeWidth(stage, 40);
293: }
294:
295: public synchronized String getDetails() {
296: return name + " : " + count;
297: }
298:
299: public synchronized void contextAdded() {
300: count++;
301: }
302:
303: public synchronized void contextRemoved() {
304: count--;
305: }
306:
307: public synchronized void reset() {
308: count = 0;
309: }
310: }
311:
312: }
|