001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import java.util.*;
032:
033: /**
034: * TPSThreadManager provides a threadpool-per-source-per-stage thread
035: * manager implementation.
036: *
037: * @author Matt Welsh
038: */
039:
040: public class TPSThreadManager implements ThreadManagerIF,
041: sandStormConst {
042:
043: private static final boolean DEBUG = false;
044: private static final boolean DEBUG_VERBOSE = false;
045:
046: protected ManagerIF mgr;
047: protected SandstormConfig config;
048: protected Hashtable srTbl;
049: protected ThreadPoolController sizeController;
050:
051: public TPSThreadManager(ManagerIF mgr) {
052: this (mgr, true);
053: }
054:
055: public TPSThreadManager(ManagerIF mgr, boolean initialize) {
056: this .mgr = mgr;
057: this .config = mgr.getConfig();
058:
059: if (initialize) {
060: if (config
061: .getBoolean("global.threadPool.sizeController.enable")) {
062: sizeController = new ThreadPoolController(mgr);
063: }
064: srTbl = new Hashtable();
065: }
066: }
067:
068: /**
069: * Register a stage with this thread manager.
070: */
071: public void register(StageWrapperIF stage) {
072: // Create a threadPool for the stage
073: stageRunnable sr = new stageRunnable(stage);
074: srTbl.put(sr, stage);
075: }
076:
077: /**
078: * Deregister a stage with this thread manager.
079: */
080: public void deregister(StageWrapperIF stage) {
081: Enumeration e = srTbl.keys();
082: while (e.hasMoreElements()) {
083: stageRunnable sr = (stageRunnable) e.nextElement();
084: StageWrapperIF s = (StageWrapperIF) srTbl.get(sr);
085: if (s == stage) {
086: sr.tp.stop();
087: srTbl.remove(sr);
088: }
089: }
090: }
091:
092: /**
093: * Stop the thread manager and all threads managed by it.
094: */
095: public void deregisterAll() {
096: Enumeration e = srTbl.keys();
097: while (e.hasMoreElements()) {
098: stageRunnable sr = (stageRunnable) e.nextElement();
099: StageWrapperIF s = (StageWrapperIF) srTbl.get(sr);
100: sr.tp.stop();
101: srTbl.remove(sr);
102: }
103: }
104:
105: /**
106: * Internal class representing the Runnable for a single stage.
107: */
108: public class stageRunnable implements Runnable {
109:
110: protected ThreadPool tp;
111: protected StageWrapperIF wrapper;
112: protected SourceIF source;
113: protected String name;
114: protected ResponseTimeControllerIF rtController = null;
115: protected boolean firstToken = false;
116: protected int aggTarget = -1;
117:
118: protected stageRunnable(StageWrapperIF wrapper, ThreadPool tp) {
119: this .wrapper = wrapper;
120: this .tp = tp;
121: this .source = wrapper.getSource();
122: this .name = wrapper.getStage().getName();
123:
124: if (tp != null) {
125: if (sizeController != null) {
126: // The sizeController is globally enabled -- has the user disabled
127: // it for this stage?
128: String val = config.getString("stages." + this .name
129: + ".threadPool.sizeController.enable");
130: if ((val == null) || val.equals("true")
131: || val.equals("TRUE")) {
132: sizeController.register(wrapper, tp);
133: }
134: }
135: }
136: this .rtController = wrapper.getResponseTimeController();
137:
138: if (tp != null)
139: tp.start();
140: }
141:
142: protected stageRunnable(StageWrapperIF wrapper) {
143: this .wrapper = wrapper;
144: this .tp = tp;
145: this .source = wrapper.getSource();
146: this .name = wrapper.getStage().getName();
147:
148: // Create a threadPool for the stage
149: if (wrapper.getEventHandler() instanceof SingleThreadedEventHandlerIF) {
150: tp = new ThreadPool(wrapper, mgr, this , 1);
151: } else {
152: tp = new ThreadPool(wrapper, mgr, this );
153: }
154:
155: if (sizeController != null) {
156: // The sizeController is globally enabled -- has the user disabled
157: // it for this stage?
158: String val = config.getString("stages." + this .name
159: + ".threadPool.sizeController.enable");
160: if ((val == null) || val.equals("true")
161: || val.equals("TRUE")) {
162: sizeController.register(wrapper, tp);
163: }
164: }
165: this .rtController = wrapper.getResponseTimeController();
166:
167: tp.start();
168: }
169:
170: public void run() {
171: int blockTime;
172: long t1, t2;
173: long tstart = 0, tend = 0;
174: boolean isFirst = false;
175:
176: if (DEBUG)
177: System.err.println(name + ": starting, source is "
178: + source);
179:
180: t1 = System.currentTimeMillis();
181:
182: while (true) {
183:
184: synchronized (this ) {
185: if (firstToken == false) {
186: firstToken = true;
187: isFirst = true;
188: }
189: }
190:
191: try {
192:
193: blockTime = (int) tp.getBlockTime();
194: aggTarget = tp.getAggregationTarget();
195:
196: if (DEBUG_VERBOSE)
197: System.err.println(name
198: + ": Doing blocking dequeue for "
199: + wrapper);
200:
201: QueueElementIF fetched[];
202: if (aggTarget == -1) {
203: if (DEBUG_VERBOSE)
204: System.err.println("TPSTM <" + this .name
205: + "> dequeue (aggTarget -1)");
206: fetched = source
207: .blocking_dequeue_all(blockTime);
208: } else {
209: if (DEBUG_VERBOSE)
210: System.err.println("TPSTM <" + this .name
211: + "> dequeue (aggTarget "
212: + aggTarget + ")");
213: fetched = source.blocking_dequeue(blockTime,
214: aggTarget);
215: }
216:
217: if (fetched == null) {
218: t2 = System.currentTimeMillis();
219: if (tp.timeToStop(t2 - t1)) {
220: if (DEBUG)
221: System.err.println(name + ": Exiting");
222: if (isFirst) {
223: synchronized (this ) {
224: firstToken = false;
225: }
226: }
227: return;
228: }
229: continue;
230: }
231:
232: t1 = System.currentTimeMillis();
233:
234: if (DEBUG_VERBOSE)
235: System.err.println(name + ": Got "
236: + fetched.length + " elements for "
237: + wrapper);
238:
239: /* Process events */
240: tstart = System.currentTimeMillis();
241: wrapper.getEventHandler().handleEvents(fetched);
242: tend = System.currentTimeMillis();
243:
244: /* Record service rate */
245: ((StageWrapper) wrapper).getStats()
246: .recordServiceRate(fetched.length,
247: tend - tstart);
248:
249: /* Run response time controller controller */
250: if (rtController != null) {
251: if (rtController instanceof ResponseTimeControllerMM1) {
252: ((ResponseTimeControllerMM1) rtController)
253: .adjustThreshold(fetched, tstart,
254: tend, isFirst, tp
255: .numThreads());
256: } else {
257: rtController.adjustThreshold(fetched, tend
258: - tstart);
259: }
260: }
261:
262: if (tp.timeToStop(0)) {
263: if (DEBUG)
264: System.err.println(name + ": Exiting");
265: if (isFirst) {
266: synchronized (this ) {
267: firstToken = false;
268: }
269: }
270: return;
271: }
272:
273: Thread.currentThread().yield();
274:
275: } catch (Exception e) {
276: //System.err.println("TPSThreadManager: appThread ["+name+"] got exception "+e);
277: //e.printStackTrace();
278: }
279: }
280: }
281: }
282:
283: }
|