001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.core;
026:
027: import seda.sandStorm.api.*;
028: import seda.util.*;
029:
030: /**
031: * This enqueue predicate implements input rate policing.
032: */
033: public class RateLimitingPredicate implements EnqueuePredicateIF {
034:
035: private static final boolean DEBUG = false;
036:
037: private SinkIF thesink;
038: private double targetRate;
039: private int depth;
040: private double tokenCount;
041: private double regenTimeMS;
042: private long lasttime;
043:
044: // Number of milliseconds between regenerations
045: private long MIN_REGEN_TIME = 0;
046:
047: private static final boolean PROFILE = true;
048: private StatsGatherer interArrivalStats;
049: private StatsGatherer acceptArrivalStats;
050:
051: /**
052: * Create a new RateLimitingPredicate for the given sink,
053: * targetRate, and token bucket depth. A rate of -1.0 indicates no rate limit.
054: */
055: public RateLimitingPredicate(SinkIF sink, double targetRate,
056: int depth) {
057: this .thesink = sink;
058: this .targetRate = targetRate;
059: this .regenTimeMS = (1.0 / targetRate) * 1.0e3;
060: if (this .regenTimeMS < 1)
061: this .regenTimeMS = 1;
062: this .depth = depth;
063: this .tokenCount = depth * 1.0;
064: this .lasttime = System.currentTimeMillis();
065:
066: System.err.println("RateLimitingPredicate<" + sink.toString()
067: + ">: Created");
068:
069: if (PROFILE) {
070: interArrivalStats = new StatsGatherer("IA<"
071: + sink.toString() + ">", "IA<" + sink.toString()
072: + ">", 1, 0);
073: acceptArrivalStats = new StatsGatherer("AA<"
074: + sink.toString() + ">", "AA<" + sink.toString()
075: + ">", 1, 0);
076: }
077: }
078:
079: /**
080: * Returns true if the given element can be accepted into the queue.
081: */
082: public boolean accept(QueueElementIF qel) {
083: if (targetRate == -1.0)
084: return true;
085:
086: // First regenerate tokens
087: long curtime = System.currentTimeMillis();
088: long delay = curtime - lasttime;
089:
090: if (PROFILE) {
091: interArrivalStats.add(delay);
092: }
093:
094: if (delay >= MIN_REGEN_TIME) {
095: double numTokens = ((double) delay * 1.0)
096: / (regenTimeMS * 1.0);
097: tokenCount += numTokens;
098: if (tokenCount > depth)
099: tokenCount = depth;
100: lasttime = curtime;
101: }
102:
103: if (tokenCount >= 1.0) {
104: tokenCount -= 1.0;
105: if (PROFILE) {
106: acceptArrivalStats.add(delay);
107: }
108: return true;
109: } else {
110: return false;
111: }
112: }
113:
114: /**
115: * Return the current rate limit.
116: */
117: public double getTargetRate() {
118: return targetRate;
119: }
120:
121: /**
122: * Return the current depth.
123: */
124: public int getDepth() {
125: return depth;
126: }
127:
128: /**
129: * Return the number of tokens currently in the bucket.
130: */
131: public int getBucketSize() {
132: return (int) tokenCount;
133: }
134:
135: /**
136: * Set the rate limit. A limit of -1.0 indicates no rate limit.
137: */
138: public void setTargetRate(double targetRate) {
139: this .targetRate = targetRate;
140: this .regenTimeMS = (1.0 / targetRate) * 1.0e3;
141: if (regenTimeMS < 1)
142: regenTimeMS = 1;
143: }
144:
145: /**
146: * Set the bucket depth.
147: */
148: public void setDepth(int depth) {
149: this.depth = depth;
150: }
151:
152: }
|