001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import java.util.*;
032:
033: /**
034: * TPPThreadManager is a thread manager implementation which provides
035: * one thread per CPU.
036: *
037: * @author Matt Welsh
038: */
039:
040: class TPPThreadManager implements ThreadManagerIF {
041:
042: private static final boolean DEBUG = false;
043: private static final boolean DEBUG_VERBOSE = false;
044:
045: private int num_cpus, max_threads;
046: private Vector stages;
047: private Vector threads;
048: private ThreadGroup tg;
049:
050: /**
051: * Create an TPPThreadManager which attempts to schedule stages on
052: * num_cpus CPUs, and caps its thread usage to max_threads.
053: */
054: TPPThreadManager(SandstormConfig config) {
055: this .num_cpus = config.getInt("global.TPPTM.numCpus");
056: this .max_threads = config.getInt("global.TPPTM.maxThreads");
057: stages = new Vector(1);
058: threads = new Vector(num_cpus);
059:
060: tg = new ThreadGroup("TPPThreadManager");
061: for (int i = 0; i < num_cpus; i++) {
062: String name = new String("TPPTM-" + i);
063: Thread t = new Thread(tg, new appThread(name), name);
064: threads.addElement(t);
065: t.start();
066: }
067: }
068:
069: /**
070: * Register a stage with this thread manager.
071: */
072: public void register(StageWrapperIF stage) {
073: synchronized (stages) {
074: stages.addElement(stage);
075: stages.notifyAll();
076: }
077: }
078:
079: /**
080: * Deregister a stage with this thread manager.
081: */
082: public void deregister(StageWrapperIF stage) {
083: if (!stages.removeElement(stage))
084: throw new IllegalArgumentException("Stage " + stage
085: + " not registered with this TM");
086: }
087:
088: /**
089: * Deregister all stage with this thread manager.
090: */
091: public void deregisterAll() {
092: Enumeration e = stages.elements();
093: while (e.hasMoreElements()) {
094: StageWrapperIF stage = (StageWrapperIF) e.nextElement();
095: deregister(stage);
096: }
097: tg.stop();
098: }
099:
100: /**
101: * Internal class representing a single TPPTM-managed thread.
102: */
103: class appThread implements Runnable {
104:
105: private String name;
106:
107: appThread(String name) {
108: this .name = name;
109: }
110:
111: // Simple round-robin scheduling for now
112: public void run() {
113: System.err.println(name + ": starting");
114:
115: while (true) {
116:
117: try {
118:
119: // Wait until we have some stages
120: if (stages.size() == 0) {
121: synchronized (stages) {
122: try {
123: stages.wait();
124: } catch (InterruptedException ie) {
125: // Ignore
126: }
127: }
128: }
129:
130: for (int i = 0; i < stages.size(); i++) {
131: StageWrapperIF s = (StageWrapperIF) stages
132: .elementAt(i);
133: if (DEBUG_VERBOSE)
134: System.err.println(name + ": inspecting "
135: + s);
136: SourceIF src = s.getSource();
137: QueueElementIF qelarr[] = src.dequeue_all();
138: if (qelarr != null) {
139: if (DEBUG)
140: System.err.println(name + ": dequeued "
141: + qelarr.length
142: + " elements for " + s);
143: s.getEventHandler().handleEvents(qelarr);
144: if (DEBUG)
145: System.err
146: .println(name
147: + ": returned from handleEvents for "
148: + s);
149: } else {
150: if (DEBUG_VERBOSE)
151: System.err.println(name
152: + ": got null on dequeue");
153: }
154: }
155:
156: } catch (Exception e) {
157: System.err.println("TPPThreadManager: appThread ["
158: + name + "] got exception " + e);
159: e.printStackTrace();
160: }
161: }
162: }
163: }
164:
165: }
|