001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import java.util.*;
031: import seda.util.*;
032:
033: /**
034: * aSocketRCTMSleep is a version of aSocketThreadManager that incorporates
035: * a rate controller: given a target packet-processing rate, it adjusts
036: * its schedule to attempt to match that rate. The controller is based
037: * on adding controlled pauses to the packet-processing loop.
038: *
039: * @author Matt Welsh
040: */
041: class aSocketRCTMSleep extends aSocketThreadManager implements
042: ThreadManagerIF, aSocketConst {
043:
044: private static final boolean DEBUG = false;
045: private static final int INITIAL_SLEEPTIME = 1;
046: private static final int INITIAL_SLEEPFREQ = 1;
047: private static final int MAX_AGGREGATION = 32;
048: private double targetRate;
049:
050: aSocketRCTMSleep(ManagerIF mgr) {
051: super (mgr);
052: this .targetRate = mgr.getConfig().getInt(
053: "global.aSocket.rateController.rate");
054: System.err.println("aSocketRCTMSleep: Created, target rate "
055: + targetRate);
056: }
057:
058: protected aSocketThread makeThread(aSocketStageWrapper wrapper) {
059: return new aSocketRCThread(wrapper);
060: }
061:
062: /**
063: * Internal class representing a single aSocketTM-managed thread.
064: */
065: protected class aSocketRCThread extends
066: aSocketThreadManager.aSocketThread implements Runnable {
067: private final long MIN_USEFUL_SLEEP = 10;
068:
069: protected aSocketRCThread(aSocketStageWrapper wrapper) {
070: super (wrapper);
071: }
072:
073: public void run() {
074: if (DEBUG)
075: System.err.println(name + ": starting, selsource="
076: + selsource + ", eventQ=" + eventQ
077: + ", targetRate=" + targetRate);
078:
079: long t1, t2;
080: int num_measurements = 0, num_events = 0;
081: long sleeptime = INITIAL_SLEEPTIME;
082: int sleepfreq = INITIAL_SLEEPFREQ;
083: int aggTarget;
084:
085: t1 = System.currentTimeMillis();
086:
087: while (true) {
088:
089: try {
090:
091: aggTarget = tp.getAggregationTarget();
092:
093: while (selsource.numActive() == 0) {
094: if (DEBUG)
095: System.err
096: .println(name
097: + ": numActive is zero, waiting on event queue");
098: QueueElementIF qelarr[];
099: if (aggTarget == -1) {
100: qelarr = eventQ
101: .blocking_dequeue_all(EVENT_QUEUE_TIMEOUT);
102: } else {
103: qelarr = eventQ.blocking_dequeue(
104: EVENT_QUEUE_TIMEOUT, aggTarget);
105: }
106: if (qelarr != null) {
107: if (DEBUG)
108: System.err.println(name + ": got "
109: + qelarr.length
110: + " new requests");
111: num_events += qelarr.length;
112: handler.handleEvents(qelarr);
113: }
114: }
115:
116: for (int s = 0; s < SELECT_SPIN; s++) {
117: if (DEBUG)
118: System.err.println(name
119: + ": doing select, numActive "
120: + selsource.numActive());
121: SelectQueueElement ret[];
122: if (aggTarget == -1) {
123: ret = (SelectQueueElement[]) selsource
124: .blocking_dequeue_all(SELECT_TIMEOUT);
125: } else {
126: ret = (SelectQueueElement[]) selsource
127: .blocking_dequeue(SELECT_TIMEOUT,
128: aggTarget);
129: }
130: if (ret != null) {
131: if (DEBUG)
132: System.err.println(name
133: + ": select got " + ret.length
134: + " elements");
135: num_events += ret.length;
136: handler.handleEvents(ret);
137: } else if (DEBUG)
138: System.err.println(name
139: + ": select got null");
140: }
141:
142: if (DEBUG)
143: System.err.println(name
144: + ": Checking request queue");
145: for (int s = 0; s < EVENT_QUEUE_SPIN; s++) {
146: QueueElementIF qelarr[];
147: if (aggTarget == -1) {
148: qelarr = eventQ.dequeue_all();
149: } else {
150: qelarr = eventQ.dequeue(aggTarget);
151: }
152: if (qelarr != null) {
153: if (DEBUG)
154: System.err.println(name + ": got "
155: + qelarr.length
156: + " new requests");
157: num_events += qelarr.length;
158: handler.handleEvents(qelarr);
159: break;
160: }
161: }
162:
163: } catch (Exception e) {
164: System.err.println(name + ": got exception " + e);
165: e.printStackTrace();
166: }
167:
168: if (((num_measurements % sleepfreq) == 0)
169: && (sleeptime > 0)) {
170: try {
171: Thread.currentThread().sleep(sleeptime);
172: } catch (InterruptedException ie) {
173: // Ignore
174: }
175: }
176:
177: t2 = System.currentTimeMillis();
178: num_measurements++;
179:
180: if ((num_measurements % MEASUREMENT_SIZE) == 0) {
181: double timesec = ((t2 - t1) * 1.0e-3);
182: double actualrate = num_events / timesec;
183: System.err.println("aSocketRCTMSleep (" + name
184: + "): time " + MDWUtil.format(timesec)
185: + ", num_events " + num_events);
186: //if (DEBUG)
187: System.err.println("aSocketRCTMSleep (" + name
188: + "): Rate is "
189: + MDWUtil.format(actualrate) + ", target "
190: + targetRate + ", sleeptime " + sleeptime);
191:
192: if ((actualrate >= (1.05 * targetRate))
193: || (actualrate <= (0.95 * targetRate))) {
194: // Update delay
195: double delay = (num_events / targetRate)
196: - timesec;
197: sleeptime = (long) (delay * 1.0e3);
198: if (sleeptime < 0)
199: sleeptime = 0;
200: if ((sleeptime > 0)
201: && ((sleeptime / MEASUREMENT_SIZE) < MIN_USEFUL_SLEEP)) {
202: sleeptime = MIN_USEFUL_SLEEP;
203: sleepfreq = (int) (MIN_USEFUL_SLEEP / sleeptime);
204: if (sleepfreq < 1)
205: sleepfreq = 1;
206: } else {
207: sleeptime /= MEASUREMENT_SIZE;
208: sleepfreq = 1;
209: }
210: System.err.println("aSocketRCTMSleep (" + name
211: + "): Adjusted sleeptime to "
212: + sleeptime + ", sleepfreq "
213: + sleepfreq);
214: }
215:
216: t1 = System.currentTimeMillis();
217: num_events = 0;
218: }
219:
220: }
221: }
222:
223: }
224:
225: }
|