001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.internal.*;
031: import seda.sandStorm.main.*;
032: import java.util.*;
033:
034: /**
035: * aSocketThreadManager provides a thread manager for the aSocket layer:
036: * one thread for each of the read, write, and listen stages.
037: *
038: * @author Matt Welsh
039: */
040: class aSocketThreadManager implements ThreadManagerIF, aSocketConst {
041:
042: private static final boolean DEBUG = false;
043:
044: private ManagerIF mgr;
045:
046: aSocketThreadManager(ManagerIF mgr) {
047: this .mgr = mgr;
048: }
049:
050: protected aSocketThread makeThread(aSocketStageWrapper wrapper) {
051: return new aSocketThread(wrapper);
052: }
053:
054: /**
055: * Register a stage with this thread manager.
056: */
057: public void register(StageWrapperIF thestage) {
058: aSocketStageWrapper stage = (aSocketStageWrapper) thestage;
059: aSocketThread at = makeThread(stage);
060: ThreadPool tp = new ThreadPool(stage, mgr, at, 1);
061: at.registerTP(tp);
062: tp.start();
063: }
064:
065: /**
066: * Deregister a stage with this thread manager.
067: */
068: public void deregister(StageWrapperIF stage) {
069: throw new IllegalArgumentException(
070: "aSocketThreadManager: deregister not supported");
071: }
072:
073: /**
074: * Deregister all stages from this thread manager.
075: */
076: public void deregisterAll() {
077: throw new IllegalArgumentException(
078: "aSocketThreadManager: deregisterAll not supported");
079: }
080:
081: /**
082: * Internal class representing a single aSocketTM-managed thread.
083: */
084: protected class aSocketThread implements Runnable {
085:
086: protected ThreadPool tp;
087: protected StageWrapperIF wrapper;
088: protected SelectSourceIF selsource;
089: protected SourceIF eventQ;
090: protected String name;
091: protected EventHandlerIF handler;
092:
093: protected aSocketThread(aSocketStageWrapper wrapper) {
094: if (DEBUG)
095: System.err.println("!!!!!aSocketThread init");
096: this .wrapper = wrapper;
097: this .name = "aSocketThread <"
098: + wrapper.getStage().getName() + ">";
099: this .selsource = wrapper.getSelectSource();
100: this .eventQ = wrapper.getEventQueue();
101: this .handler = wrapper.getEventHandler();
102: }
103:
104: void registerTP(ThreadPool tp) {
105: this .tp = tp;
106: }
107:
108: public void run() {
109: int aggTarget;
110: if (DEBUG)
111: System.err.println(name + ": starting, selsource="
112: + selsource + ", eventQ=" + eventQ
113: + ", handler=" + handler);
114:
115: while (true) {
116:
117: if (DEBUG)
118: System.err.println(name + ": Looping in run()");
119: try {
120:
121: aggTarget = tp.getAggregationTarget();
122:
123: while (selsource != null
124: && selsource.numActive() == 0) {
125: if (DEBUG)
126: System.err
127: .println(name
128: + ": numActive is zero, waiting on event queue");
129: QueueElementIF qelarr[];
130: if (aggTarget == -1) {
131: qelarr = eventQ
132: .blocking_dequeue_all(EVENT_QUEUE_TIMEOUT);
133: } else {
134: qelarr = eventQ.blocking_dequeue(
135: EVENT_QUEUE_TIMEOUT, aggTarget);
136: }
137:
138: if (qelarr != null) {
139: if (DEBUG)
140: System.err.println(name + ": got "
141: + qelarr.length
142: + " new requests");
143: handler.handleEvents(qelarr);
144: }
145: }
146:
147: for (int s = 0; s < SELECT_SPIN; s++) {
148: if (DEBUG)
149: System.err.println(name
150: + ": doing select, numActive "
151: + selsource.numActive());
152: SelectQueueElement ret[];
153:
154: if (aggTarget == -1) {
155: ret = (SelectQueueElement[]) selsource
156: .blocking_dequeue_all(SELECT_TIMEOUT);
157: } else {
158: ret = (SelectQueueElement[]) selsource
159: .blocking_dequeue(SELECT_TIMEOUT,
160: aggTarget);
161: }
162:
163: if (ret != null) {
164: if (DEBUG)
165: System.err.println(name
166: + ": select got " + ret.length
167: + " elements");
168: long tstart = System.currentTimeMillis();
169: handler.handleEvents(ret);
170: long tend = System.currentTimeMillis();
171: wrapper.getStats().recordServiceRate(
172: ret.length, tend - tstart);
173:
174: } else if (DEBUG)
175: System.err.println(name
176: + ": select got null");
177: }
178:
179: if (DEBUG)
180: System.err.println(name
181: + ": Checking request queue");
182: for (int s = 0; s < EVENT_QUEUE_SPIN; s++) {
183: QueueElementIF qelarr[];
184: if (aggTarget == -1) {
185: qelarr = eventQ.dequeue_all();
186: } else {
187: qelarr = eventQ.dequeue(aggTarget);
188: }
189: if (qelarr != null) {
190: if (DEBUG)
191: System.err.println(name + ": got "
192: + qelarr.length
193: + " new requests");
194: handler.handleEvents(qelarr);
195: break;
196: }
197: }
198:
199: Thread.currentThread().yield();
200:
201: } catch (Exception e) {
202: System.err.println(name + ": got exception " + e);
203: e.printStackTrace();
204: }
205: }
206: }
207:
208: }
209:
210: }
|