001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.thread;
028:
029: import java.util.Iterator;
030: import java.util.List;
031:
032: import org.cougaar.bootstrap.SystemProperties;
033: import org.cougaar.core.component.Component;
034: import org.cougaar.core.component.ServiceBroker;
035: import org.cougaar.core.component.ServiceProvider;
036: import org.cougaar.core.mts.MessageAddress;
037: import org.cougaar.core.node.NodeControlService;
038: import org.cougaar.core.node.NodeIdentificationService;
039: import org.cougaar.core.service.AgentIdentificationService;
040: import org.cougaar.core.service.ThreadControlService;
041: import org.cougaar.core.service.ThreadListenerService;
042: import org.cougaar.core.service.ThreadService;
043: import org.cougaar.util.GenericStateModelAdapter;
044: import org.cougaar.util.StateModelException;
045:
046: /**
047: * This component is the {@link ServiceProvider} for the {@link
048: * ThreadService}, {@link ThreadControlService}, {@link
049: * ThreadListenerService}, and {@link ThreadStatusService}.
050: *
051: */
052: public final class ThreadServiceProvider extends
053: GenericStateModelAdapter implements ServiceProvider, Component {
054:
055: private static final String SERVICE_TYPE_PROPERTY = "org.cougaar.thread.service.type";
056:
057: private static ThreadPool[] pools;
058: private static int[] lane_sizes = new int[ThreadService.LANE_COUNT];
059:
060: private static synchronized void makePools() {
061: if (pools != null)
062: return;
063:
064: pools = new ThreadPool[ThreadService.LANE_COUNT];
065: int initializationCount = 10; // could be a param
066: for (int i = 0; i < pools.length; i++)
067: pools[i] = new ThreadPool(lane_sizes[i],
068: initializationCount, "Pool-" + i);
069: }
070:
071: private static synchronized void stopPools() {
072: if (pools == null)
073: return;
074: for (int i = 0; i < pools.length; i++) {
075: pools[i].stopAllThreads();
076: }
077: pools = null;
078: }
079:
080: static final boolean validateThreadStatusServiceClient(Object client) {
081: return (client instanceof TopServlet)
082: || (client instanceof RogueThreadDetector)
083: || (client instanceof org.cougaar.core.node.StateDumpServiceComponent);
084: }
085:
086: private ServiceBroker sb;
087: private boolean isRoot;
088: private ThreadListenerProxy listenerProxy;
089: private ThreadControlService controlProxy;
090: private ThreadServiceProxy proxy;
091: private ThreadStatusService statusProxy;
092: private String name;
093: private int laneCount = ThreadService.LANE_COUNT;
094: private TrivialThreadServiceProvider threadServiceProvider;
095:
096: public ThreadServiceProvider() {
097: }
098:
099: public void setServiceBroker(ServiceBroker sb) {
100: this .sb = sb;
101: }
102:
103: public void load() {
104: super .load();
105:
106: ServiceBroker the_sb = sb;
107: isRoot = !the_sb.hasService(ThreadService.class);
108:
109: String type = SystemProperties.getProperty(
110: SERVICE_TYPE_PROPERTY, "hierarchical");
111: if (type.equals("trivial")) {
112: if (isRoot) {
113: threadServiceProvider = new TrivialThreadServiceProvider();
114: threadServiceProvider.setServiceBroker(the_sb);
115: threadServiceProvider.initialize();
116: threadServiceProvider.load();
117: threadServiceProvider.start();
118: }
119: return;
120: } else if (type.equals("single")) {
121: if (isRoot) {
122: threadServiceProvider = new SingleThreadServiceProvider();
123: threadServiceProvider.setServiceBroker(the_sb);
124: threadServiceProvider.initialize();
125: threadServiceProvider.load();
126: threadServiceProvider.start();
127: }
128: return;
129: }
130:
131: // Hierarchical service
132:
133: makePools();
134:
135: // check if this component was added with parameters
136: if (name == null) {
137: // Make default values from position in containment hierarcy
138: AgentIdentificationService ais = (AgentIdentificationService) the_sb
139: .getService(this , AgentIdentificationService.class,
140: null);
141: MessageAddress agentAddr = ais.getMessageAddress();
142: the_sb.releaseService(this ,
143: AgentIdentificationService.class, ais);
144:
145: NodeIdentificationService nis = (NodeIdentificationService) the_sb
146: .getService(this , NodeIdentificationService.class,
147: null);
148: MessageAddress nodeAddr = nis.getMessageAddress();
149: the_sb.releaseService(this ,
150: NodeIdentificationService.class, nis);
151:
152: name = isRoot ? "Node " + nodeAddr : "Agent_" + agentAddr;
153: }
154:
155: if (isRoot) {
156: /*Starter.startThread();
157: Reclaimer.startThread();*/
158: SchedulableStateChangeQueue.startThread();
159: // use the root service broker
160: NodeControlService ncs = (NodeControlService) sb
161: .getService(this , NodeControlService.class, null);
162: the_sb = ncs.getRootServiceBroker();
163:
164: }
165:
166: ThreadService parent = (ThreadService) the_sb.getService(this ,
167: ThreadService.class, null);
168: final TreeNode node = makeProxies(parent);
169: provideServices(the_sb);
170: if (isRoot) {
171: statusProxy = new ThreadStatusService() {
172: public int iterateOverStatus(
173: ThreadStatusService.Body body) {
174: return node.iterateOverQueuedThreads(body)
175: + node.iterateOverRunningThreads(body);
176: }
177: };
178: the_sb.addService(ThreadStatusService.class, this );
179: }
180: }
181:
182: /**
183: * Gracefully unload this component.
184: * <p>
185: * @see org.cougaar.util.GenericStateModelAdapter#unload()
186: */
187: public synchronized void unload() throws StateModelException {
188:
189: if (threadServiceProvider == null) {
190: // Unload hierarchical thread service and Timers
191: if (isRoot) {
192: TreeNode.releaseTimer();
193: SchedulableStateChangeQueue.stopThread();
194: stopPools();
195: }
196: } else {
197: // Unload singleton ThreadServiceProvider and Timer
198: TreeNode.releaseTimer();
199: threadServiceProvider.halt();
200: threadServiceProvider.unload();
201: threadServiceProvider = null;
202: }
203:
204: super .unload();
205: }
206:
207: private void setParameterFromString(String property) {
208: int sepr = property.indexOf('=');
209: if (sepr < 0)
210: return;
211: String key = property.substring(0, sepr);
212: String value = property.substring(++sepr);
213: int lane_index, lane_max;
214:
215: if (key.equals("name")) {
216: name = value;
217: } else if (key.equals("isRoot")) {
218: isRoot = value.equalsIgnoreCase("true");
219: } else if (key.equals("BestEffortAbsCapacity")) {
220: lane_index = ThreadService.BEST_EFFORT_LANE;
221: lane_max = Integer.parseInt(value);
222: lane_sizes[lane_index] = lane_max;
223: } else if (key.equals("WillBlockAbsCapacity")) {
224: lane_index = ThreadService.WILL_BLOCK_LANE;
225: lane_max = Integer.parseInt(value);
226: lane_sizes[lane_index] = lane_max;
227: } else if (key.equals("CpuIntenseAbsCapacity")) {
228: lane_index = ThreadService.CPU_INTENSE_LANE;
229: lane_max = Integer.parseInt(value);
230: lane_sizes[lane_index] = lane_max;
231: } else if (key.equals("WellBehavedAbsCapacity")) {
232: lane_index = ThreadService.WELL_BEHAVED_LANE;
233: lane_max = Integer.parseInt(value);
234: lane_sizes[lane_index] = lane_max;
235: } // add more later
236: }
237:
238: public void setParameter(Object param) {
239: if (param instanceof List) {
240: Iterator itr = ((List) param).iterator();
241: while (itr.hasNext()) {
242: setParameterFromString((String) itr.next());
243: }
244: } else if (param instanceof String) {
245: setParameterFromString((String) param);
246: }
247: }
248:
249: private Scheduler makeScheduler(Object[] args, int lane)
250:
251: {
252: Scheduler scheduler = new PropagatingScheduler(listenerProxy);
253:
254: scheduler.setLane(lane);
255: scheduler.setAbsoluteMax(lane_sizes[lane]);
256: return scheduler;
257: }
258:
259: private TreeNode makeProxies(ThreadService parent) {
260: listenerProxy = new ThreadListenerProxy(laneCount);
261:
262: Object[] actuals = { listenerProxy };
263: Scheduler[] schedulers = new Scheduler[laneCount];
264: for (int i = 0; i < schedulers.length; i++) {
265: schedulers[i] = makeScheduler(actuals, i);
266: }
267:
268: ThreadServiceProxy parentProxy = (ThreadServiceProxy) parent;
269: TreeNode node = new TreeNode(schedulers, pools, name,
270: parentProxy);
271: proxy = new ThreadServiceProxy(node);
272: controlProxy = new ThreadControlServiceProxy(node);
273: listenerProxy.setTreeNode(node);
274: return node;
275: }
276:
277: private void provideServices(ServiceBroker the_sb) {
278: the_sb.addService(ThreadService.class, this );
279: the_sb.addService(ThreadControlService.class, this );
280: the_sb.addService(ThreadListenerService.class, this );
281: }
282:
283: public Object getService(ServiceBroker sb, Object requestor,
284: Class serviceClass) {
285: if (serviceClass == ThreadService.class) {
286: return proxy;
287: } else if (serviceClass == ThreadControlService.class) {
288: // Later this will be tightly restricted
289: return controlProxy;
290: } else if (serviceClass == ThreadListenerService.class) {
291: return listenerProxy;
292: } else if (serviceClass == ThreadStatusService.class) {
293: if (validateThreadStatusServiceClient(requestor))
294: return statusProxy;
295: else
296: return null;
297: } else {
298: return null;
299: }
300: }
301:
302: public void releaseService(ServiceBroker sb, Object requestor,
303: Class serviceClass, Object service) {
304: }
305:
306: }
|