001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.thread;
028:
029: import java.io.FileInputStream;
030: import java.util.Comparator;
031: import java.util.HashMap;
032: import java.util.Iterator;
033: import java.util.List;
034: import java.util.Map;
035: import java.util.Properties;
036: import java.util.TreeSet;
037:
038: import org.cougaar.bootstrap.SystemProperties;
039: import org.cougaar.core.component.ServiceBroker;
040: import org.cougaar.core.service.ThreadListenerService;
041: import org.cougaar.core.service.ThreadService;
042: import org.cougaar.util.PropertyParser;
043:
044: /**
045: * A sample {@link RightsSelector} that attempts to select among the
046: * children in such a way as to match a set of target percentages.
047: *
048: * @property org.cougaar.thread.targets Specifies a file which
049: * contains percentage targets for the children.
050: */
051: public class PercentageLoadSelector extends RoundRobinSelector
052: implements ThreadListener {
053:
054: private static final String TARGETS_PROP = "org.cougaar.thread.targets";
055: private Map<String, ConsumerRecord> records = new HashMap<String, ConsumerRecord>();
056: private Properties properties = new Properties();
057: private int total = 0;
058: private Comparator<Scheduler> comparator;
059: private TreeSet<Scheduler> orderedChildren;
060:
061: public PercentageLoadSelector(ServiceBroker sb) {
062: String propertiesFilename = SystemProperties
063: .getProperty(TARGETS_PROP);
064: if (propertiesFilename != null) {
065: try {
066: FileInputStream fos = new FileInputStream(
067: propertiesFilename);
068: properties.load(fos);
069: fos.close();
070: } catch (java.io.IOException ex) {
071: ex.printStackTrace();
072: }
073: }
074:
075: comparator = new DistanceComparator();
076:
077: ThreadListenerService tls = (ThreadListenerService) sb
078: .getService(this , ThreadListenerService.class, null);
079: if (tls != null)
080: tls.addListener(this );
081:
082: ThreadService tsvc = (ThreadService) sb.getService(this ,
083: ThreadService.class, null);
084: Runnable body = new SnapShotter();
085: Schedulable sched = tsvc.getThread(this , body);
086: sched.schedule(5000, 1000);
087: sb.releaseService(this , ThreadService.class, tsvc);
088: }
089:
090: ConsumerRecord findRecord(String name) {
091: ConsumerRecord rec = null;
092: synchronized (records) {
093: rec = records.get(name);
094: if (rec == null) {
095: rec = new ConsumerRecord(name);
096: records.put(name, rec);
097: }
098: }
099: return rec;
100: }
101:
102: public void threadQueued(Schedulable schedulable, Object consumer) {
103: }
104:
105: public void threadDequeued(Schedulable schedulable, Object consumer) {
106: }
107:
108: public void threadStarted(Schedulable schedulable, Object consumer) {
109: }
110:
111: public void threadStopped(Schedulable schedulable, Object consumer) {
112: }
113:
114: public void rightGiven(String consumer) {
115: ++total;
116: ConsumerRecord rec = findRecord(consumer);
117: rec.accumulate();
118: ++rec.outstanding;
119: }
120:
121: public void rightReturned(String consumer) {
122: --total;
123: ConsumerRecord rec = findRecord(consumer);
124: rec.accumulate();
125: --rec.outstanding;
126: }
127:
128: private double getSchedulerDistance(Scheduler scheduler) {
129: ConsumerRecord rec = findRecord(scheduler.getName());
130: if (rec != null) {
131: return rec.distance();
132: } else {
133: return 1;
134: }
135: }
136:
137: // RightsSelector
138:
139: // Too inefficient to use but simple to write...
140: private void rankChildren() {
141: TreeSet<Scheduler> result = new TreeSet<Scheduler>(comparator);
142: List<TreeNode> children = scheduler.getTreeNode().getChildren();
143: for (TreeNode child : children) {
144: result.add(child.getScheduler(scheduler.getLane()));
145: }
146: result.add(scheduler);
147: orderedChildren = result;
148: }
149:
150: public SchedulableObject getNextPending() {
151: if (orderedChildren == null) {
152: // Snapshotter hasn't run yet. Round-robin instead.
153: return super .getNextPending();
154: }
155: // Choose the one with the largest distance()
156: Iterator<Scheduler> itr = orderedChildren.iterator();
157: Scheduler next = null;
158: SchedulableObject handoff = null;
159: while (itr.hasNext()) {
160: next = itr.next();
161: // The list contains the scheduler itself as well as its
162: // children, In the former case we can't call
163: // getNextPending since that will recurse forever. We
164: // need the super method, conveniently available as
165: // getNextPendingSuper.
166: if (next == scheduler) {
167: handoff = scheduler.popQueue();
168: } else {
169: handoff = next.getNextPending();
170: }
171: if (handoff != null) {
172: // If we're the parent of the Scheduler to which the
173: // handoff is given, increase the local count.
174: if (next != scheduler) {
175: scheduler.incrementRunCount(next);
176: }
177: return handoff;
178: }
179: }
180: return null;
181: }
182:
183: private class SnapShotter implements Runnable {
184: public void run() {
185: synchronized (records) {
186: Iterator itr = records.values().iterator();
187: while (itr.hasNext()) {
188: ConsumerRecord rec = (ConsumerRecord) itr.next();
189: rec.snapshot();
190: }
191: }
192: rankChildren();
193: }
194: }
195:
196: private class DistanceComparator implements Comparator<Scheduler> {
197: private int hashCompare(Object o1, Object o2) {
198: if (o1.hashCode() < o2.hashCode())
199: return -1;
200: else
201: return 1;
202: }
203:
204: public int compare(Scheduler o1, Scheduler o2) {
205: if (o1 == o2)
206: return 0;
207:
208: Scheduler x = o1;
209: Scheduler y = o2;
210: double x_distance = getSchedulerDistance(x);
211: double y_distance = getSchedulerDistance(y);
212:
213: // Smaller distances are less preferable
214: if (x_distance < y_distance)
215: return 1;
216: else if (x_distance > y_distance)
217: return -1;
218: else
219: return hashCompare(o1, o2);
220: }
221:
222: }
223:
224: private class ConsumerRecord {
225: String name;
226: int outstanding;
227: long timestamp;
228: double accumulator;
229: long snapshot_timestamp;
230: double snapshot_accumulator;
231: double rate;
232: double targetPercentage;
233:
234: ConsumerRecord(String name) {
235: this .name = name;
236: targetPercentage = PropertyParser.getDouble(properties,
237: name, .05);
238: System.err.println(name + " target=" + targetPercentage);
239: }
240:
241: synchronized void snapshot() {
242: long now = System.currentTimeMillis();
243: rate = (accumulator - snapshot_accumulator)
244: / (now - snapshot_timestamp);
245: snapshot_timestamp = now;
246: snapshot_accumulator = accumulator;
247: System.out.println(name + " rate=" + rate);
248: }
249:
250: double distance() {
251: return (targetPercentage - rate) / targetPercentage;
252: }
253:
254: synchronized void accumulate() {
255: long now = System.currentTimeMillis();
256: if (timestamp > 0) {
257: double deltaT = now - timestamp;
258: accumulator += deltaT * outstanding;
259: }
260: timestamp = now;
261: }
262: }
263:
264: }
|