001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.tools.csmart.runtime.plugin;
027:
028: import org.cougaar.core.agent.service.alarm.Alarm;
029: import org.cougaar.core.blackboard.IncrementalSubscription;
030: import org.cougaar.planning.ldm.asset.*;
031: import org.cougaar.planning.ldm.plan.*;
032: import org.cougaar.planning.plugin.legacy.SimplePlugin;
033: import org.cougaar.planning.plugin.util.PluginHelper;
034: import org.cougaar.util.UnaryPredicate;
035:
036: import java.util.Collection;
037: import java.util.Date;
038: import java.util.Enumeration;
039: import java.util.HashMap;
040: import java.util.HashSet;
041: import java.util.Iterator;
042: import java.util.Vector;
043:
044: /**
045: * MetricsInitializerPlugin : Launch a number of tasks and wait
046: * for them to flow through the system, measuring the time and memory
047: * expended. We can also open a back door to a controller process to
048: * publish the outcome.
049: *
050: * Startup Procedure
051: *
052: * As organizations with MetricsProvider capability report in, we
053: * send them an "AreYouReady" task. This task is intended to yield a
054: * 100% confidence allocation result when all the Agents have
055: * initialized.
056: *
057: * When 100% ready reponses have been received from a number of
058: * MetricsProvider organizations that equals the numProviders
059: * parameter, we send a "Start" task to them indicating that they
060: * should begin collecting statistics.
061: *
062: * If sampleInterval is non-zero, then after every sampleInterval seconds
063: * have been completed a statistics sample is taken by issuing a
064: * "Sample" control task.
065: *
066: * The 100% completion of the sample tasks causes a "Finish" task to be
067: * sent which gathers the final statistics.
068: *
069: **/
070: public class MetricsInitializerPlugin extends CSMARTPlugin implements
071: MetricsConstants
072:
073: {
074:
075: // Numeric parameters
076: private int numProviders = 0;
077: private int sampleInterval = 0;
078: private int startDelay = 0;
079: private int maxNumSamples = 0;
080:
081: private long startTime;
082:
083: // private int taskRate;
084: // private int messageBytesPerTask;
085:
086: static final int INITIAL = 0;
087: static final int WAITING = 1;
088: static final int STARTING = 2;
089: static final int RUNNING = 3;
090: static final int FINISHING = 4;
091: static final int FINISHED = 5;
092: static final int SAMPLING = 6;
093: static final int SAMPLING_AND_DONE = 7;
094:
095: // Have we started processing?
096: int state = INITIAL;
097:
098: private HashSet activeControlTasks = new HashSet();
099: private Alarm sampleTimer = null; // Times sampling
100:
101: HashMap metricsProviders = new HashMap();
102:
103: /**
104: * Subscription to our subordinates. When all subordinates have
105: * checked in, we begin the sampling.
106: **/
107: private IncrementalSubscription localAssets;
108: private UnaryPredicate localPredicate = new UnaryPredicate() {
109: public boolean execute(Object o) {
110: return ((o instanceof Asset)
111: && (o instanceof HasRelationships) && (((HasRelationships) o)
112: .isLocal()));
113: }
114: };
115:
116: protected void addMetricsProviders(HasRelationships localAsset) {
117: Collection relationships = localAsset.getRelationshipSchedule()
118: .getMatchingRelationships(Role_MetricsControlProvider);
119:
120: for (Iterator iterator = relationships.iterator(); iterator
121: .hasNext();) {
122: Relationship relationship = (Relationship) iterator.next();
123: HasRelationships metricsProvider = localAsset
124: .getRelationshipSchedule().getOther(relationship);
125: metricsProviders.put(((Asset) metricsProvider).getKey(),
126: metricsProvider);
127: }
128: }
129:
130: private IncrementalSubscription readyAllocations;
131: private IncrementalSubscription startAllocations;
132: private IncrementalSubscription sampleAllocations;
133: private IncrementalSubscription finishAllocations;
134:
135: private static class ControlPredicate implements UnaryPredicate {
136:
137: private Verb verb;
138:
139: ControlPredicate(Verb verb) {
140: this .verb = verb;
141: }
142:
143: public boolean execute(Object o) {
144: if (o instanceof Allocation) {
145: Allocation alloc = (Allocation) o;
146: Task task = alloc.getTask();
147: return task.getVerb().equals(verb);
148: }
149: return false;
150: }
151: }
152:
153: public void setupSubscriptions() {
154: Vector params = getParameters() != null ? new Vector(
155: getParameters()) : null;
156:
157: if ((params.size() < 2) || (params.size() > 4)) {
158: if (log.isDebugEnabled()) {
159: log
160: .debug("MetricsInitializerPlugin Usage: <numProviders> <sampleInterval> [<startDelay> [<maxNumSamples>]]");
161: }
162: return;
163: }
164:
165: // How many Agents are doing measuring?
166: // For each one, need an Entity to appear with the appropriate Role
167: // numProviders = Integer.parseInt((String) params.elementAt(3));
168: numProviders = Integer.parseInt((String) params.elementAt(0));
169:
170: // Time in seconds between samples. If 0, do no sampling
171: // sampleInterval = Integer.parseInt((String) params.elementAt(5));
172: sampleInterval = Integer.parseInt((String) params.elementAt(1));
173:
174: // Number of seconds to wait before starting the run.
175: if (params.size() > 2) {
176: startDelay = Integer.parseInt((String) params.elementAt(2));
177: }
178:
179: if (params.size() > 3) {
180: maxNumSamples = Integer.parseInt((String) params
181: .elementAt(3));
182: }
183: if (log.isDebugEnabled()) {
184: log.debug("MetricsInitializerPlugin: " + " numProviders = "
185: + numProviders + ", sampleInterval = "
186: + sampleInterval + ", startDelay = " + startDelay
187: + ", maxNumSamples = " + maxNumSamples);
188: }
189:
190: localAssets = (IncrementalSubscription) subscribe(localPredicate);
191: // Verb.getVerb("Ready");
192: readyAllocations = (IncrementalSubscription) subscribe(new ControlPredicate(
193: Verb_Ready));
194: // Verb.getVerb("Start");
195: startAllocations = (IncrementalSubscription) subscribe(new ControlPredicate(
196: Verb_Start));
197: // Verb.getVerb("Sample");
198: sampleAllocations = (IncrementalSubscription) subscribe(new ControlPredicate(
199: Verb_Sample));
200: // Verb.getVerb("Finish");
201: finishAllocations = (IncrementalSubscription) subscribe(new ControlPredicate(
202: Verb_Finish));
203: }
204:
205: /**
206: * Execute plugin. We use a simple state machine to take us through
207: * the steps needed. The only somewhat complicated part involves the
208: * SAMPLING state. Periodically (every so many tasks), we can
209: * initiate a statistics sample. The statistics sample is taken in a
210: * fashion similar to all the other control steps: send a control
211: * task and wait for everyone to respond.
212: **/
213: public void execute() {
214: switch (state) {
215: case INITIAL:
216: // No point in looking for metrics providers if there are no local
217: // Assets
218: if (localAssets.size() > 0) {
219: for (Iterator iterator = localAssets.getCollection()
220: .iterator(); iterator.hasNext();) {
221: addMetricsProviders((HasRelationships) iterator
222: .next());
223: }
224: if (checkProviders()) {
225: sendControl(Verb_Ready);
226: setCurrentState(WAITING);
227: }
228: }
229: break;
230:
231: case WAITING:
232: if (checkControlTasks(readyAllocations)) {
233: sendControl(Verb_Start);
234: setCurrentState(STARTING);
235: }
236: break;
237:
238: case STARTING:
239: if (checkControlTasks(startAllocations)) {
240: startTime = System.currentTimeMillis();
241:
242: startRunning();
243:
244: setCurrentState(RUNNING);
245:
246: startSampleTimer();
247: }
248: break;
249:
250: case SAMPLING:
251: // Keep tabs on the society
252: if (checkControlTasks(sampleAllocations)) {
253: // OK, done with sampling. Go back to running
254: setCurrentState(RUNNING);
255: } else {
256: // Only want to break and wait to be called again
257: // if we are still in the SAMPLING state.
258: // If we are in the RUNNING state now, we want
259: // to check if we're done now, if the time has expired,
260: // etc. So we fall through to the RUNNING case.
261: break;
262: }
263:
264: case RUNNING:
265: // Without this next, there's no way to stop!
266: // we'll never go to the "FINISHING" state
267: if (doneRunning()) {
268: finish(); // All root tasks have been sent,
269: break;
270: }
271:
272: // If it's time again, do another sample
273: if (sampleTimer != null && sampleTimer.hasExpired()) {
274: setCurrentState(SAMPLING);
275: sendControl(Verb_Sample);
276: startSampleTimer(); // Restart
277: break;
278: }
279: break;
280:
281: case FINISHING:
282: // the society is done running. Make sure we've done all our sampling
283: if (checkControlTasks(finishAllocations)) {
284: finishEverything(); // in Ray's version, this does a System.exit()
285: }
286: break;
287: }
288: }
289:
290: private void startRunning() {
291: // Send out your root tasks here, for example.
292: // So Statistics would do sendRootTasks()
293: }
294:
295: private boolean doneRunning() {
296: return (new Date().getTime() >= (startTime + ((sampleInterval * 1000L) * maxNumSamples)));
297: }
298:
299: private void startSampleTimer() {
300: if (startDelay > 0) {
301: sampleTimer = wakeAfterRealTime((startDelay * 1000L)
302: + (sampleInterval * 1000L));
303: startDelay = 0;
304: } else if (sampleInterval > 0) {
305: sampleTimer = wakeAfterRealTime(sampleInterval * 1000L);
306: }
307: }
308:
309: private void setCurrentState(int newState) {
310: state = newState;
311: }
312:
313: private void finish() {
314: sendControl(Verb_Finish);
315: setCurrentState(FINISHING);
316: }
317:
318: private boolean checkProviders() {
319: if (log.isInfoEnabled()) {
320: log.info("Num providers: " + numProviders
321: + " metricsProviders.size(): "
322: + metricsProviders.size());
323: }
324: return (metricsProviders.size() >= numProviders);
325: }
326:
327: private boolean checkControlTasks(IncrementalSubscription sub) {
328: if (sub.hasChanged()) {
329: PluginHelper.updateAllocationResult(sub);
330: return (checkControlTasks(sub.getAddedList()) || checkControlTasks(sub
331: .getChangedList()));
332: }
333: return false;
334: }
335:
336: private boolean checkControlTasks(Enumeration elements) {
337: while (elements.hasMoreElements()) {
338: PlanElement pe = (PlanElement) elements.nextElement();
339: AllocationResult estAR = pe.getEstimatedResult();
340: Task task = pe.getTask();
341: if (estAR != null) {
342: if (estAR.getConfidenceRating() >= 1.0) {
343: activeControlTasks.remove(task);
344: }
345: }
346: }
347: return activeControlTasks.isEmpty();
348: }
349:
350: private void sendControl(Verb verb) {
351: for (Iterator iterator = metricsProviders.values().iterator(); iterator
352: .hasNext();) {
353: Asset metricsProvider = (Asset) iterator.next();
354: NewTask task = theLDMF.newTask();
355: task.setDirectObject(metricsProvider);
356: task.setVerb(verb);
357: task.setPlan(theLDMF.getRealityPlan());
358: // Set null set of preferences
359: Vector preferences = new Vector();
360: // if it's a sample task
361: // I could specify:
362: // time to (start) sampling at (optional)
363: // time between samples
364: // number of samples
365:
366: // How would this be used? Realistically, you'd just send 1 sample task
367: // with all that data included
368: // So the time between samples and number of samples would be your initial
369: // input parameters,
370: // and whatever is doing the measuring would respond only when it had finished
371:
372: task.setPreferences(preferences.elements());
373: activeControlTasks.add(task);
374: publishAdd(task);
375: if (!((HasRelationships) metricsProvider).isLocal()) {
376: Allocation alloc = theLDMF.createAllocation(task
377: .getPlan(), task, metricsProvider, null,
378: Role_MetricsControlProvider);
379: publishAdd(alloc);
380: }
381: }
382: }
383:
384: private void finishEverything() {
385: if (log.isDebugEnabled()) {
386: log.debug("All finished!");
387: }
388: //System.exit(0);
389: }
390:
391: //////////////////////////////////////////////
392: // Helper functions below here
393:
394: /** like super.wakeAfter() except always in real (wallclock) time.
395: **/
396: private Alarm wakeAfterRealTime(long delayTime) {
397: if (delayTime <= 0) {
398: if (log.isErrorEnabled()) {
399: log.error("\nwakeAfterRealTime(" + delayTime
400: + ") is in the past!");
401: }
402: delayTime = 1000;
403: }
404:
405: long absTime = System.currentTimeMillis() + delayTime;
406: PluginAlarm pa = new PluginAlarm(absTime);
407: alarmService.addRealTimeAlarm(pa);
408: return pa;
409: }
410:
411: /**
412: * Helper class - a simple Alarm
413: */
414: class PluginAlarm implements Alarm {
415: private long expiresAt;
416: private boolean expired = false;
417:
418: public PluginAlarm(long expirationTime) {
419: expiresAt = expirationTime;
420: }
421:
422: public long getExpirationTime() {
423: return expiresAt;
424: }
425:
426: public synchronized void expire() {
427: if (!expired) {
428: expired = true;
429: MetricsInitializerPlugin.this .blackboard
430: .signalClientActivity();
431: }
432: }
433:
434: public boolean hasExpired() {
435: return expired;
436: }
437:
438: public synchronized boolean cancel() {
439: boolean was = expired;
440: expired = true;
441: return was;
442: }
443:
444: public String toString() {
445: return "<PluginAlarm " + expiresAt
446: + (expired ? "(Expired) " : " ") + "for "
447: + this .toString() + ">";
448: }
449: } // end of PluginAlarm class definition
450: }
|