001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.core;
026:
027: import seda.sandStorm.api.*;
028: import seda.util.*;
029:
030: /**
031: * This enqueue predicate implements multiclass input rate policing.
032: */
033: public class MulticlassRateLimitingPredicate implements
034: EnqueuePredicateIF {
035:
036: private static final boolean DEBUG = false;
037:
038: private SinkIF thesink;
039: private int NUM_CLASSES;
040: private double targetRate[];
041: private int depth[];
042: private double tokenCount[];
043: private double regenTimeMS[];
044: private long lasttime[];
045:
046: // Number of milliseconds between regenerations
047: private long MIN_REGEN_TIME = 0;
048:
049: private static final boolean PROFILE = true;
050: private StatsGatherer interArrivalStats;
051: private StatsGatherer acceptArrivalStats;
052:
053: /**
054: * Create a new RateLimitingPredicate for the given sink,
055: * targetRate, and token bucket depth. A rate of -1.0 indicates no rate limit.
056: */
057: public MulticlassRateLimitingPredicate(SinkIF sink, int numclasses,
058: double targetRate, int depth) {
059: this .thesink = sink;
060: this .NUM_CLASSES = numclasses;
061:
062: this .targetRate = new double[NUM_CLASSES];
063: this .depth = new int[NUM_CLASSES];
064: this .regenTimeMS = new double[NUM_CLASSES];
065: this .tokenCount = new double[NUM_CLASSES];
066: this .lasttime = new long[NUM_CLASSES];
067:
068: for (int c = 0; c < NUM_CLASSES; c++) {
069: this .targetRate[c] = targetRate;
070: this .regenTimeMS[c] = (1.0 / this .targetRate[c]) * 1.0e3;
071: if (this .regenTimeMS[c] < 1)
072: this .regenTimeMS[c] = 1;
073: this .depth[c] = depth;
074: this .tokenCount[c] = depth * 1.0;
075: this .lasttime[c] = System.currentTimeMillis();
076: }
077:
078: System.err.println("MulticlassRateLimitingPredicate<"
079: + sink.toString() + ">: Created");
080:
081: if (PROFILE) {
082: interArrivalStats = new StatsGatherer("IA<"
083: + sink.toString() + ">", "IA<" + sink.toString()
084: + ">", 1, 0);
085: acceptArrivalStats = new StatsGatherer("AA<"
086: + sink.toString() + ">", "AA<" + sink.toString()
087: + ">", 1, 0);
088: }
089: }
090:
091: /**
092: * Returns true if the given element can be accepted into the queue.
093: */
094: public boolean accept(QueueElementIF qel) {
095:
096: if (DEBUG)
097: System.err.println("MCRLP <" + thesink.toString()
098: + ": Got " + qel);
099:
100: int c = 0;
101: if (qel instanceof ClassQueueElementIF) {
102: ClassQueueElementIF cqel = (ClassQueueElementIF) qel;
103: c = cqel.getRequestClass();
104: if (c == -1)
105: c = 0;
106: }
107: if (DEBUG)
108: System.err.println("MCRLP <" + thesink.toString()
109: + ": Class is " + c);
110:
111: if (targetRate[c] == -1.0)
112: return true;
113:
114: // First regenerate tokens
115: long curtime = System.currentTimeMillis();
116: long delay = curtime - lasttime[c];
117:
118: if (PROFILE) {
119: interArrivalStats.add(delay);
120: }
121:
122: if (delay >= MIN_REGEN_TIME) {
123: double numTokens = ((double) delay * 1.0)
124: / (regenTimeMS[c] * 1.0);
125: tokenCount[c] += numTokens;
126: if (tokenCount[c] > depth[c])
127: tokenCount[c] = depth[c];
128: lasttime[c] = curtime;
129: }
130:
131: if (tokenCount[c] >= 1.0) {
132: tokenCount[c] -= 1.0;
133: if (PROFILE) {
134: acceptArrivalStats.add(delay);
135: }
136: return true;
137: } else {
138: return false;
139: }
140: }
141:
142: /**
143: * Return the current rate limit.
144: */
145: public double getTargetRate(int theclass) {
146: return targetRate[theclass];
147: }
148:
149: /**
150: * Return the current depth.
151: */
152: public int getDepth(int theclass) {
153: return depth[theclass];
154: }
155:
156: /**
157: * Return the number of tokens currently in the bucket.
158: */
159: public int getBucketSize(int theclass) {
160: return (int) tokenCount[theclass];
161: }
162:
163: /**
164: * Set the rate limit. A limit of -1.0 indicates no rate limit.
165: */
166: public void setTargetRate(int theclass, double targetRate) {
167: // Kill off old tokens if reducing rate
168: if (targetRate < this .targetRate[theclass]) {
169: this .tokenCount[theclass] = 0;
170: }
171:
172: this .targetRate[theclass] = targetRate;
173: this .regenTimeMS[theclass] = (1.0 / targetRate) * 1.0e3;
174: if (regenTimeMS[theclass] < 1)
175: regenTimeMS[theclass] = 1;
176: }
177:
178: /**
179: * Set the bucket depth.
180: */
181: public void setDepth(int theclass, int depth) {
182: this.depth[theclass] = depth;
183: }
184:
185: }
|