001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.net.URI;
030: import java.util.Comparator;
031: import java.util.HashMap;
032: import java.util.HashSet;
033: import java.util.Iterator;
034:
035: import org.cougaar.core.component.ServiceBroker;
036: import org.cougaar.core.mts.MessageAddress;
037: import org.cougaar.core.service.ThreadControlService;
038: import org.cougaar.core.service.ThreadListenerService;
039: import org.cougaar.core.service.ThreadService;
040: import org.cougaar.core.service.wp.AddressEntry;
041: import org.cougaar.core.service.wp.WhitePagesService;
042: import org.cougaar.core.thread.Schedulable;
043: import org.cougaar.core.thread.ThreadListener;
044: import org.cougaar.mts.base.BoundComponent;
045: import org.cougaar.mts.base.DestinationQueue;
046:
047: /**
048: * This test Component restricts the number of available threads and
049: * modifies the thread queue order to be more fair with the remaining
050: * rights.
051: */
052: public class DestinationThreadConstrictor extends BoundComponent
053: implements ThreadListener {
054:
055: private class Constrictor implements Comparator {
056:
057: int timestamp_compare(Object o1, Object o2) {
058: long t1 = getTimestamp(o1);
059: long t2 = getTimestamp(o2);
060: if (t1 < t2)
061: return -1;
062: else if (t1 > t2)
063: return 1;
064: else
065: return 0;
066: }
067:
068: public int compare(Object o1, Object o2) {
069: int count1 = getSchedulableNodeCount((Schedulable) o1);
070: int count2 = getSchedulableNodeCount((Schedulable) o2);
071:
072: if (count1 == count2) {
073: // Since they're equal, only the timestamp is relevant.
074: return timestamp_compare(o1, o2);
075: }
076:
077: if (count1 < maxPerNode && count2 < maxPerNode) {
078: // Both are under the limit, so use the timestamp.
079: return timestamp_compare(o1, o2);
080: }
081:
082: // If we get here, one or both exceeds the max, but not
083: // equally. Prefer the smaller one.
084: int compare = 0;
085: if (count1 < count2)
086: compare = -1; // prefer the first
087: else
088: compare = 1; // prefer the second
089:
090: if (loggingService.isInfoEnabled()) {
091: // If the result is different from the default, based
092: // on timestamps, log that here.
093: int default_compare = timestamp_compare(o1, o2);
094: if (compare == default_compare) {
095: // silent
096: } else if (compare == -1) {
097: loggingService.info("Prefer " + o1 + "[" + count1
098: + "] even though " + o2 + "[" + count2
099: + "] is older");
100: } else {
101: loggingService.info("Prefer " + o2 + "[" + count2
102: + "] even though " + o1 + "[" + count1
103: + "] is older");
104: }
105: }
106:
107: return compare;
108: }
109:
110: public boolean equals(Object x) {
111: return x == this ;
112: }
113:
114: }
115:
116: private static final int MAX_PER_NODE_DEFAULT = 1;
117: private static final int MAX_THREADS_DEFAULT = 15;
118: private static final String TOPOLOGY = "topology";
119:
120: private HashMap counts = new HashMap();
121: private HashMap timestamps = new HashMap();
122: private HashMap agent_nodes = new HashMap();
123: private HashSet agents = new HashSet();
124: private WhitePagesService wpService;
125: private int maxPerNode;
126: private int maxThreads;
127:
128: private long getTimestamp(Object x) {
129: synchronized (timestamps) {
130: return ((Long) timestamps.get(x)).longValue();
131: }
132: }
133:
134: private int getSchedulableNodeCount(Schedulable sched) {
135: Object o = sched.getConsumer();
136: if (o instanceof DestinationQueue) {
137: MessageAddress addr = ((DestinationQueue) o)
138: .getDestination();
139: String node = getNodeName(addr);
140: if (node == null)
141: return 0;
142: Integer count = null;
143: synchronized (counts) {
144: count = (Integer) counts.get(node);
145: }
146: if (count == null)
147: return 0;
148: return count.intValue();
149: } else {
150: return 0;
151: }
152:
153: }
154:
155: private String getNodeName(MessageAddress agent) {
156: synchronized (agent_nodes) {
157: String node = (String) agent_nodes.get(agent.getPrimary());
158: if (node != null)
159: return node;
160: }
161:
162: synchronized (agents) {
163: agents.add(agent.getPrimary());
164: }
165:
166: if (loggingService.isDebugEnabled())
167: loggingService
168: .debug("Couldn't find node of Agent " + agent);
169:
170: return null;
171: }
172:
173: // Runs in a scheduled task, since we don't want to call WP in
174: // the Comparator.
175: private void lookupNodes() {
176: // TBD: Agent -> Node
177: // WP call? Ugh.
178: HashSet copy = null;
179: synchronized (agents) {
180: copy = new HashSet(agents);
181: }
182:
183: Iterator itr = copy.iterator();
184: while (itr.hasNext()) {
185: MessageAddress agent = (MessageAddress) itr.next();
186: try {
187: AddressEntry entry = wpService.get(agent.getAddress(),
188: TOPOLOGY, -1);
189: if (entry != null) {
190: URI uri = entry.getURI();
191: synchronized (agent_nodes) {
192: agent_nodes.put(agent, uri.getPath().substring(
193: 1));
194: }
195: } else {
196: if (loggingService.isDebugEnabled())
197: loggingService
198: .debug("Couldn't find node of Agent "
199: + agent + ": WP returned null");
200: }
201: } catch (Exception ex) {
202: if (loggingService.isDebugEnabled())
203: loggingService.debug("Couldn't find node of Agent "
204: + agent + ": " + ex.getMessage());
205: }
206: }
207: }
208:
209: private void incrementCount(Object o) {
210: if (o instanceof DestinationQueue) {
211: MessageAddress addr = ((DestinationQueue) o)
212: .getDestination();
213: String node = getNodeName(addr);
214: if (node == null)
215: return;
216: synchronized (counts) {
217: Integer count = (Integer) counts.get(node);
218: if (count == null)
219: count = new Integer(1);
220: else
221: count = new Integer(count.intValue() + 1);
222: counts.put(node, count);
223: if (count.intValue() >= maxThreads) {
224: if (loggingService.isWarnEnabled())
225: loggingService.warn("Node " + node
226: + " is using all the threads [" + count
227: + "] in pool");
228: }
229: if (loggingService.isDebugEnabled())
230: loggingService.debug("Increment: count for " + addr
231: + " on node " + node + " = " + count);
232: }
233: }
234: }
235:
236: private void decrementCount(Object o) {
237: if (o instanceof DestinationQueue) {
238: MessageAddress addr = ((DestinationQueue) o)
239: .getDestination();
240: String node = getNodeName(addr);
241: if (node == null)
242: return;
243: synchronized (counts) {
244: Integer count = (Integer) counts.get(node);
245: if (count != null)
246: count = new Integer(count.intValue() - 1);
247: else
248: count = new Integer(0);
249: if (loggingService.isDebugEnabled())
250: loggingService.debug("Decrement: count for " + addr
251: + " on node " + node + " = " + count);
252: counts.put(node, count);
253: }
254: }
255: }
256:
257: public void start() {
258: super .start();
259:
260: maxPerNode = (int) getParameter("MaxPerNode",
261: MAX_PER_NODE_DEFAULT);
262: maxThreads = (int) getParameter("MaxThreads",
263: MAX_THREADS_DEFAULT);
264: int lane = ThreadService.WILL_BLOCK_LANE;
265:
266: ServiceBroker sb = getServiceBroker();
267:
268: wpService = (WhitePagesService) sb.getService(this ,
269: WhitePagesService.class, null);
270: if (wpService == null) {
271: throw new RuntimeException("Can't get WhitePagesService");
272: }
273:
274: ThreadService tsvc = (ThreadService) sb.getService(this ,
275: ThreadService.class, null);
276: Runnable runnable = new Runnable() {
277: public void run() {
278: lookupNodes();
279: }
280: };
281: Schedulable sched = tsvc.getThread(this , runnable,
282: "Constrictor Node Lookup");
283: sched.schedule(0, 1000);
284: sb.releaseService(this , ThreadService.class, tsvc);
285:
286: ThreadListenerService tls = (ThreadListenerService) sb
287: .getService(this , ThreadListenerService.class, null);
288: if (tls != null) {
289: tls.addListener(this , lane);
290: } else {
291: throw new RuntimeException(
292: "Can't get ThreadListenerService");
293: }
294:
295: ThreadControlService tcs = (ThreadControlService) sb
296: .getService(this , ThreadControlService.class, null);
297: if (tcs != null) {
298: Comparator cmp = new Constrictor();
299: tcs.setQueueComparator(cmp, lane);
300: tcs.setMaxRunningThreadCount(maxThreads, lane);
301: } else {
302: tls.removeListener(this , lane);
303: sb.releaseService(this , ThreadListenerService.class, tls);
304: throw new RuntimeException("Can't get ThreadControlService");
305: }
306:
307: sb.releaseService(this , ThreadListenerService.class, tls);
308: sb.releaseService(this , ThreadControlService.class, tcs);
309:
310: }
311:
312: // ThreadListener
313:
314: public void threadStarted(Schedulable schedulable, Object consumer) {
315: incrementCount(consumer);
316: }
317:
318: public void threadStopped(Schedulable schedulable, Object consumer) {
319: decrementCount(consumer);
320: }
321:
322: public void threadQueued(Schedulable schedulable, Object consumer) {
323: synchronized (timestamps) {
324: timestamps.put(schedulable, new Long(System
325: .currentTimeMillis()));
326: }
327: }
328:
329: public void threadDequeued(Schedulable schedulable, Object consumer) {
330: synchronized (timestamps) {
331: timestamps.remove(schedulable);
332: }
333: }
334:
335: public void rightGiven(String consumer) {
336: }
337:
338: public void rightReturned(String consumer) {
339: }
340:
341: }
|