001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.internal;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.main.*;
031: import seda.sandStorm.lib.aSocket.*;
032: import seda.sandStorm.lib.aDisk.*;
033: import java.io.*;
034: import java.util.*;
035:
036: /**
037: * This class provides management functionality for the Sandstorm
038: * runtime system. It is responsible for initializing the system,
039: * creating and registering stages and thread managers, and other
040: * administrative functions. Stages and thread managers can interact
041: * with this class through the ManagerIF and SystemManagerIF interfaces;
042: * this class should not be used directly.
043: *
044: * @author Matt Welsh
045: * @see seda.sandStorm.api.ManagerIF
046: * @see seda.sandStorm.api.internal.SystemManagerIF
047: *
048: */
049: public class sandStormMgr implements ManagerIF, SystemManagerIF,
050: sandStormConst {
051:
052: private ThreadManagerIF defaulttm;
053: private Hashtable tmtbl;
054:
055: private SandstormConfig mgrconfig;
056: private Hashtable stagetbl;
057: private Vector stagestoinit;
058: private boolean started = false;
059: private sandStormProfiler profiler;
060: private SignalMgr signalMgr;
061:
062: /**
063: * Create a sandStormMgr which reads its configuration from the
064: * given file.
065: */
066: public sandStormMgr(SandstormConfig mgrconfig) throws Exception {
067: this .mgrconfig = mgrconfig;
068:
069: stagetbl = new Hashtable();
070: tmtbl = new Hashtable();
071: stagestoinit = new Vector();
072: signalMgr = new SignalMgr();
073:
074: String dtm = mgrconfig.getString("global.defaultThreadManager");
075: if (dtm == null) {
076: throw new IllegalArgumentException(
077: "No threadmanager specified by configuration");
078: }
079:
080: if (dtm.equals(SandstormConfig.THREADMGR_TPPTM)) {
081: throw new Error("TPPThreadManager is no longer supported.");
082: /* defaulttm = new TPPThreadManager(mgrconfig); */
083: } else if (dtm.equals(SandstormConfig.THREADMGR_TPSTM)) {
084: defaulttm = new TPSThreadManager(this );
085: } else if (dtm.equals(SandstormConfig.THREADMGR_AggTPSTM)) {
086: throw new Error(
087: "AggTPSThreadManager is no longer supported.");
088: /* defaulttm = new AggTPSThreadManager(mgrconfig); */
089: } else {
090: throw new IllegalArgumentException(
091: "Bad threadmanager specified by configuration: "
092: + dtm);
093: }
094:
095: tmtbl.put("default", defaulttm);
096:
097: initialize_io();
098: loadInitialStages();
099: }
100:
101: /**
102: * Start the manager.
103: */
104: public void start() {
105: started = true;
106: System.err.println("Sandstorm: Initializing stages");
107: initStages();
108:
109: // Let the threads start
110: try {
111: System.err
112: .println("Sandstorm: Waiting for all components to start...");
113: Thread.currentThread().sleep(500);
114: } catch (InterruptedException ie) {
115: // Ignore
116: }
117:
118: System.err.println("\nSandstorm: Ready.\n");
119: }
120:
121: /**
122: * Stop the manager.
123: */
124: public void stop() {
125: Enumeration e = tmtbl.keys();
126: while (e.hasMoreElements()) {
127: String name = (String) e.nextElement();
128: ThreadManagerIF tm = (ThreadManagerIF) tmtbl.get(name);
129: System.err.println("Sandstorm: Stopping ThreadManager "
130: + name);
131: tm.deregisterAll();
132: }
133:
134: System.err.println("Sandstorm: Shutting down stages");
135: destroyStages();
136: started = false;
137: }
138:
139: /**
140: * Return a handle to given stage.
141: */
142: public StageIF getStage(String stagename)
143: throws NoSuchStageException {
144: if (stagename == null)
145: throw new NoSuchStageException("no such stage: null");
146: StageWrapperIF wrapper = (StageWrapperIF) stagetbl
147: .get(stagename);
148: if (wrapper == null)
149: throw new NoSuchStageException("no such stage: "
150: + stagename);
151: return wrapper.getStage();
152: }
153:
154: // Initialize the I/O layer
155: private void initialize_io() throws Exception {
156:
157: // Create profiler even if disabled
158: profiler = new sandStormProfiler(this );
159:
160: if (mgrconfig.getBoolean("global.profile.enable")) {
161: System.err.println("Sandstorm: Starting profiler");
162: profiler.start();
163: }
164:
165: if (mgrconfig.getBoolean("global.aSocket.enable")) {
166: System.err.println("Sandstorm: Starting aSocket layer");
167: aSocketMgr.initialize(this , this );
168: }
169:
170: if (mgrconfig.getBoolean("global.aDisk.enable")) {
171: System.err.println("Sandstorm: Starting aDisk layer");
172: AFileMgr.initialize(this , this );
173: }
174: }
175:
176: // Load stages as specified in the SandstormConfig.
177: private void loadInitialStages() throws Exception {
178: Enumeration e = mgrconfig.getStages();
179: if (e == null)
180: return;
181: while (e.hasMoreElements()) {
182: stageDescr descr = (stageDescr) e.nextElement();
183: loadStage(descr);
184: }
185: }
186:
187: /**
188: * Return the default thread manager.
189: */
190: public ThreadManagerIF getThreadManager() {
191: return defaulttm;
192: }
193:
194: /**
195: * Return the thread manager with the given name.
196: */
197: public ThreadManagerIF getThreadManager(String name) {
198: return (ThreadManagerIF) tmtbl.get(name);
199: }
200:
201: /**
202: * Add a thread manager with the given name.
203: */
204: public void addThreadManager(String name, ThreadManagerIF tm) {
205: tmtbl.put(name, tm);
206: }
207:
208: // Load a stage from the given classname with the given config.
209: private void loadStage(stageDescr descr) throws Exception {
210: String stagename = descr.stageName;
211: String classname = descr.className;
212: ConfigData config = new ConfigData(this , descr.initargs);
213: Class theclass = Class.forName(classname);
214: EventHandlerIF evHandler = (EventHandlerIF) theclass
215: .newInstance();
216: System.out.println("Sandstorm: Loaded " + stagename + " from "
217: + classname);
218:
219: StageWrapper wrapper = new StageWrapper((ManagerIF) this ,
220: stagename, evHandler, config, defaulttm,
221: descr.queueThreshold);
222:
223: createStage(wrapper, false);
224: }
225:
226: /**
227: * Create a stage with the given name from the given event handler with
228: * the given initial arguments.
229: */
230: public StageIF createStage(String stageName,
231: EventHandlerIF evHandler, String initargs[])
232: throws Exception {
233: ConfigDataIF config = new ConfigData(this , initargs);
234: if (stagetbl.get(stageName) != null) {
235: // Come up with a better (random) name
236: stageName = stageName + "-" + stagetbl.size();
237: }
238: StageWrapperIF wrapper = new StageWrapper((ManagerIF) this ,
239: stageName, evHandler, config, defaulttm);
240:
241: return createStage(wrapper, true);
242: }
243:
244: /**
245: * Create a stage from the given stage wrapper.
246: * If 'initialize' is true, initialize this stage immediately.
247: */
248: public StageIF createStage(StageWrapperIF wrapper,
249: boolean initialize) throws Exception {
250: String name = wrapper.getStage().getName();
251: if (stagetbl.get(name) != null) {
252: throw new StageNameAlreadyBoundException("Stage name "
253: + name + " already in use");
254: }
255: stagetbl.put(name, wrapper);
256:
257: if (mgrconfig.getBoolean("global.profile.enable")) {
258: profiler.add(wrapper.getStage().getName() + " queueLength",
259: (ProfilableIF) wrapper.getStage().getSink());
260: }
261:
262: if (initialize) {
263: wrapper.init();
264: } else {
265: stagestoinit.addElement(wrapper);
266: }
267: return wrapper.getStage();
268: }
269:
270: /**
271: * Return the system profiler.
272: */
273: public ProfilerIF getProfiler() {
274: return profiler;
275: }
276:
277: /**
278: * Return the system signal manager.
279: */
280: public SignalMgrIF getSignalMgr() {
281: return signalMgr;
282: }
283:
284: /**
285: * Return the SandstormConfig used to initialize this manager.
286: * Actually returns a copy of the SandstormConfig; this prevents
287: * options from being changed once the system has been initialized.
288: */
289: public SandstormConfig getConfig() {
290: return mgrconfig.getCopy();
291: }
292:
293: // Initialize all stages
294: private void initStages() {
295:
296: for (int i = 0; i < stagestoinit.size(); i++) {
297: StageWrapperIF wrapper = (StageWrapperIF) stagestoinit
298: .elementAt(i);
299: try {
300: System.err.println("-- Initializing <"
301: + wrapper.getStage().getName() + ">");
302: wrapper.init();
303: } catch (Exception ex) {
304: System.err
305: .println("Sandstorm: Caught exception initializing stage "
306: + wrapper.getStage().getName()
307: + ": "
308: + ex);
309: ex.printStackTrace();
310: System.err.println("Sandstorm: Exiting.");
311: System.exit(-1);
312: }
313: }
314:
315: signalMgr.trigger(new StagesInitializedSignal());
316: }
317:
318: // Destroy all stages
319: private void destroyStages() {
320: Enumeration e = stagetbl.elements();
321: while (e.hasMoreElements()) {
322: StageWrapperIF wrapper = (StageWrapperIF) e.nextElement();
323: try {
324: wrapper.destroy();
325: } catch (Exception ex) {
326: System.err
327: .println("Sandstorm: Caught exception destroying stage "
328: + wrapper.getStage().getName()
329: + ": "
330: + ex);
331: ex.printStackTrace();
332: }
333: }
334: }
335:
336: }
|