001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.logistics.plugin.utils;
028:
029: import org.cougaar.planning.ldm.plan.Task;
030: import org.cougaar.util.TimeSpan;
031: import org.cougaar.core.blackboard.IncrementalSubscription;
032: import org.cougaar.util.UnaryPredicate;
033: import org.cougaar.core.service.BlackboardService;
034: import org.cougaar.util.log.Logger;
035: import org.cougaar.core.blackboard.Publishable;
036: import java.util.*;
037:
038: /**
039: * A TaskScheduler instance takes as arguments a predicate
040: * and a TaskSchedulingPolicy instance.
041: * It sets up n incremental subscriptions, one corresponding to
042: * each priority level in the TaskSchedulingPolicy.
043: * The predicate for the i<sup>th</sup> incremental subscription
044: * is a logical and of the predicate given as an argument and a
045: * test whether the task has priority i.
046: * The TaskScheduler keeps track of tasks across execute cycles
047: * so that tasks are not cleared away as they normally are from
048: * a subscription (assuming the plugin properly calls initForExecuteCycle
049: * each time through).
050: * The TaskScheduler handles the bookkeeping associated with
051: * stepping through the various phases of processing, based on
052: * both priorities and time intervals, specified in the policy.
053: * The plugin must call initForExecuteCycle before processing the
054: * tasks during each execute cycle and finishedExecuteCycle after
055: * processing the tasks.
056: **/
057:
058: public class TaskScheduler {
059:
060: private TaskSchedulingPolicy.Predicate outerFilter;
061: private TaskSchedulingPolicy policy;
062: private IncrementalSubscription[] subscriptions;
063: private Logger logger;
064: private QuiescenceAccumulator quiescence;
065: private BlackboardService blackboard;
066: private String id;
067: private Storage storage;
068:
069: // lists of added, changed, and removed tasks that lives across
070: // execute cycles; one for each priority/subscription
071: private ArrayList[][] addedLists;
072: private ArrayList[][] changedLists;
073: private ArrayList[][] removedLists;
074:
075: // current phase of processing
076: private int currentPhase;
077:
078: /**
079: * Call this in the plugin's setupSubscriptions method
080: * @param outerFilter selects all tasks of interest
081: * @param policy prioritizes tasks
082: * @param blackboard where to register for subscriptions
083: * @param id an identifier for this scheduler that is unique to this agent
084: */
085: public TaskScheduler(TaskSchedulingPolicy.Predicate outerFilter,
086: TaskSchedulingPolicy policy, BlackboardService blackboard,
087: QuiescenceAccumulator quiescence, Logger logger, String id) {
088: this .outerFilter = outerFilter;
089: this .policy = policy;
090: this .logger = logger;
091: this .blackboard = blackboard;
092: this .quiescence = quiescence;
093: this .id = id;
094: subscriptions = new IncrementalSubscription[policy
095: .numPriorities()];
096: for (int i = 0; i < subscriptions.length; i++)
097: subscriptions[i] = (IncrementalSubscription) blackboard
098: .subscribe(ithPredicate(i));
099: Collection prev = blackboard.query(new UnaryPredicate() {
100: public boolean execute(Object o) {
101: if (!(o instanceof Storage))
102: return false;
103: return TaskScheduler.this .id.equals(((Storage) o).id);
104: }
105: });
106: if (prev.size() == 0) {
107: addedLists = setupLists();
108: changedLists = setupLists();
109: removedLists = setupLists();
110: resetCurrentPhase();
111: storage = new Storage();
112: updateStorage();
113: blackboard.publishAdd(storage);
114: } else {
115: storage = (Storage) prev.iterator().next();
116: addedLists = storage.addedLists;
117: changedLists = storage.changedLists;
118: removedLists = storage.removedLists;
119: currentPhase = storage.currentPhase;
120: }
121: }
122:
123: private ArrayList[][] setupLists() {
124: ArrayList[][] lists = new ArrayList[policy.numPriorities()][];
125: for (int i = 0; i < policy.numPriorities(); i++) {
126: int numPhases = policy.numPhases(i);
127: if (numPhases == 0)
128: if (logger.isErrorEnabled()) {
129: logger.error("There are no phases for priority "
130: + i + " in a task scheduler.");
131: }
132: lists[i] = new ArrayList[numPhases];
133: for (int j = 0; j < numPhases; j++)
134: lists[i][j] = new ArrayList();
135: }
136: return lists;
137: }
138:
139: private UnaryPredicate ithPredicate(final int priority) {
140: return new UnaryPredicate() {
141: public boolean execute(Object o) {
142: return ((o instanceof Task)
143: && outerFilter.execute((Task) o) && policy
144: .isPriority(priority, (Task) o));
145: }
146: };
147: }
148:
149: /**
150: * Plugins must call this each execute cycle before accessing any tasks,
151: * whether or not they access any tasks that cycle
152: */
153: public void initForExecuteCycle() {
154: if (logger.isDebugEnabled()) {
155: logger.debug("in initForExecuteCycle");
156: }
157: for (int i = 0; i < addedLists.length; i++) {
158: addItems(addedLists[i], subscriptions[i]
159: .getAddedCollection());
160: addItems(changedLists[i], subscriptions[i]
161: .getChangedCollection());
162: addItems(removedLists[i], subscriptions[i]
163: .getRemovedCollection());
164: }
165: }
166:
167: private void addItems(ArrayList[] lists, Collection items) {
168: if (lists.length == 0)
169: return;
170: lists[0].addAll(items);
171: if (!items.isEmpty()) {
172: currentPhase = 0;
173: if (logger.isDebugEnabled()) {
174: logger
175: .debug("in TaskScheduler addItems, calling clearQuiescentState");
176: }
177: quiescence.clearQuiescentState(this );
178: }
179: }
180:
181: // Default case - assumes that plugin processed tasks off of the task scheduler lists.
182: public void finishedExecuteCycle() {
183: finishedExecuteCycle(true);
184: }
185:
186: /**
187: * Plugins call this at end of execute cycle
188: *
189: * @param didProcessing true if the plugin processed the tasks and it's now safe to remove them
190: * from the task scheduler's task queue.
191: */
192: public void finishedExecuteCycle(boolean didProcessing) {
193: if (logger.isDebugEnabled()) {
194: logger.debug("in finishedExecuteCycle");
195: }
196: if (isEmpty()) {
197: return;
198: }
199: //only do these steps if we actually processed stuff (note the default is that we did)
200: if (didProcessing) {
201: shiftCollections(); //lists might be cleared here
202: currentPhase++;
203: }
204: // only requeue for execution if more to do
205: if (currentPhase < policy.getOrdering().length) {
206: blackboard.signalClientActivity();
207: if (logger.isDebugEnabled()) {
208: logger.debug("in finishedExecuteCycle, currentPhase: "
209: + currentPhase
210: + " policy.getOrdering().length: "
211: + policy.getOrdering().length);
212: }
213: } else {
214: resetCurrentPhase();
215: if (logger.isDebugEnabled()) {
216: logger
217: .debug("in TaskScheduler addItems, calling setQuiescentState");
218: }
219: quiescence.setQuiescentState(this );
220: }
221: updateStorage();
222: blackboard.publishChange(storage);
223: }
224:
225: private void updateStorage() {
226: storage.addedLists = addedLists;
227: storage.changedLists = changedLists;
228: storage.removedLists = removedLists;
229: storage.currentPhase = currentPhase;
230: storage.id = id;
231: }
232:
233: private static class Storage implements java.io.Serializable,
234: Publishable {
235: public String id;
236: public int currentPhase;
237: public ArrayList[][] addedLists;
238: public ArrayList[][] changedLists;
239: public ArrayList[][] removedLists;
240:
241: public boolean isPersistable() {
242: return true;
243: }
244: }
245:
246: private void resetCurrentPhase() {
247: currentPhase = policy.getOrdering().length;
248: }
249:
250: /**
251: * Clear out all the state information from the task scheduler
252: */
253: public void clearState() {
254: resetCurrentPhase();
255: for (int i = 0; i < addedLists.length; i++) {
256: for (int j = 0; j < addedLists[i].length; j++) {
257: addedLists[i][j].clear();
258: changedLists[i][j].clear();
259: removedLists[i][j].clear();
260: }
261: }
262: quiescence.setQuiescentState(this );
263: }
264:
265: /** check if anything to do in this scheduler */
266: public boolean isEmpty() {
267: return currentPhase >= policy.getOrdering().length;
268: }
269:
270: /** Tell the priority associated with the current processing phase */
271: public int getCurrentPriority() {
272: if (isEmpty())
273: return TaskSchedulingPolicy.UNKNOWN_PRIORITY;
274: return policy.getOrdering()[currentPhase].getPriority();
275: }
276:
277: /**
278: * Tell the time interval associate with the current phase
279: */
280: public TimeSpan getCurrentTimeSpan() {
281: if (isEmpty())
282: return null;
283: return policy.getOrdering()[currentPhase].getTimeSpan();
284: }
285:
286: private int getCurrentPhase() {
287: int pri = getCurrentPriority();
288: int phase = 0;
289: for (int i = 0; i < currentPhase; i++)
290: if (policy.getOrdering()[i].getPriority() == pri)
291: phase++;
292: return phase;
293: }
294:
295: private void shiftCollections() {
296: int pri = getCurrentPriority();
297: int phase = getCurrentPhase();
298: if (phase != (addedLists[pri].length - 1)) {
299: addedLists[pri][phase + 1].addAll(addedLists[pri][phase]);
300: changedLists[pri][phase + 1]
301: .addAll(changedLists[pri][phase]);
302: removedLists[pri][phase + 1]
303: .addAll(removedLists[pri][phase]);
304: }
305: addedLists[pri][phase].clear();
306: changedLists[pri][phase].clear();
307: removedLists[pri][phase].clear();
308: }
309:
310: /** iterates over all tasks */
311: public Iterator iterator() {
312: if (isEmpty())
313: return (new ArrayList(0)).iterator();
314: ArrayList al = new ArrayList(getAddedCollection());
315: al.addAll(getChangedCollection());
316: al.addAll(getRemovedCollection());
317: return al.iterator();
318: }
319:
320: /** All added tasks at given priority since last cleared */
321: public Collection getAddedCollection() {
322: if (isEmpty())
323: return new ArrayList(0);
324: return addedLists[getCurrentPriority()][getCurrentPhase()];
325: }
326:
327: /** All changed tasks at given priority since last cleared */
328: public Collection getChangedCollection() {
329: if (isEmpty())
330: return new ArrayList(0);
331: return changedLists[getCurrentPriority()][getCurrentPhase()];
332: }
333:
334: /** All removed tasks at given priority since last cleared */
335: public Collection getRemovedCollection() {
336: if (isEmpty())
337: return new ArrayList(0);
338: return removedLists[getCurrentPriority()][getCurrentPhase()];
339: }
340:
341: /** All tasks in the lists */
342: public Iterator getAllTasks() {
343: ArrayList al = new ArrayList();
344: for (int i = 0; i < addedLists.length; i++) {
345: for (int j = 0; j < addedLists[i].length; j++) {
346: al.addAll(addedLists[i][j]);
347: al.addAll(changedLists[i][j]);
348: al.addAll(removedLists[i][j]);
349: }
350: }
351: return al.iterator();
352: }
353:
354: public Collection getAllTasksCollection() {
355: List allTasks = new ArrayList();
356: for (int i = 0; i < subscriptions.length; i++) {
357: allTasks.addAll(subscriptions[i]);
358: }
359: return allTasks;
360: }
361: }
|