001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.mlm.plugin.perturbation;
027:
028: import java.util.Calendar;
029: import java.util.Date;
030: import java.util.Vector;
031:
032: import org.cougaar.util.ReusableThread;
033: import org.cougaar.util.ReusableThreadPool;
034:
035: /**
036: * The PerturbationScheduler class schedules the PerturbationNodes
037: * (or jobs) to be run as determined based on the Scenario Time.
038: */
039: public class PerturbationScheduler implements Runnable {
040: final public static long SECS_PER_MIN = 60;
041: final public static long MINS_PER_HOUR = 60;
042: final public static long HOURS_PER_DAY = 24;
043: final public static long DAYS_PER_WEEK = 7;
044: final public static long MILLI = 1000;
045: final public static long MILLISECONDS = SECS_PER_MIN * MILLI;
046:
047: final public static int MAX_CAPACITY = 100;
048: final public static int ONCE = 1;
049: final public static int FOREVER = -1;
050: final public static long HOURLY = MINS_PER_HOUR * MILLISECONDS;
051: final public static long DAILY = HOURS_PER_DAY * HOURLY;
052: final public static long WEEKLY = DAYS_PER_WEEK * DAILY;
053: final public static long MONTHLY = -1;
054: final public static long YEARLY = -2;
055:
056: private boolean done_;
057: private ReusableThreadPool threadPool_;
058: private Vector perturbations_ = new Vector(MAX_CAPACITY);
059: private PerturbationNode lnkUnnamed;
060:
061: /**
062: * @param perturbations The perturbations to be scheduled
063: */
064: public PerturbationScheduler(Vector perturbations) {
065: int initialPoolSize;
066:
067: // Get a reference to the reusable thread pool
068: // and start the perturbation scheduler using a
069: // reusable thread.
070:
071: setPerturbations(perturbations);
072: initialPoolSize = getPerturbations().size();
073: threadPool_ = (initialPoolSize > 0) ? ReusableThreadPool
074: .getDefaultThreadPool() : null;
075: ReusableThread thread = new ReusableThread(threadPool_);
076: thread.setRunnable(this );
077: thread.setDaemon(false);
078: thread.start();
079: }
080:
081: /**
082: * Sets the perturbations.
083: * @param theJobs The perturbations
084: */
085: private void setPerturbations(Vector theJobs) {
086: this .perturbations_ = theJobs;
087: setDone(false);
088: }
089:
090: /**
091: * Sets the done flag.
092: * @param flag Perturbation done flag
093: */
094: private void setDone(boolean flag) {
095: this .done_ = flag;
096: }
097:
098: /**
099: * Returns the perturbations.
100: * @return Returns the Perturbations.
101: */
102: private Vector getPerturbations() {
103: return this .perturbations_;
104: }
105:
106: /**
107: * Adds perturbation nodes to the scheduler's list of jobs.
108: * @param job erturbation node to be added
109: */
110: private synchronized void addPerturbation(PerturbationNode job) {
111: perturbations_.addElement(job);
112: notify();
113: }
114:
115: /**
116: * Removes the perturbation from the schedulers job list.
117: * @param job Perturbation node to be removed
118: */
119: private synchronized void deletePerturbation(PerturbationNode job) {
120: Vector jobs;
121: int remaining;
122:
123: jobs = getPerturbations();
124: remaining = jobs.size();
125: PerturbationPlugin p = null;
126: for (int i = 0; i < remaining; i++) {
127: if (((PerturbationNode) jobs.elementAt(i)) == job) {
128: jobs.removeElementAt(i);
129: remaining = jobs.size();
130: System.out
131: .println("\n<<<PerturbationPlugin>>> The number of perturbations "
132: + "remaining is: " + remaining);
133: if (remaining == 0) {
134: setDone(true);
135: }
136: break;
137: }
138: }
139: }
140:
141: /**
142: * Removes the perturbation from the schedulers job list.
143: * @param jobNumber Perturbation job number
144: */
145: private synchronized void deletePerturbation(int jobNumber) {
146: getPerturbations().removeElementAt(jobNumber);
147: }
148:
149: /**
150: * Updates the perturbation node to reflect the next iteration
151: * of the perturbation ( if any ).
152: * @return Returns the updated PerturbationNode
153: */
154: private PerturbationNode updatePerturbation(PerturbationNode pNode) {
155: Calendar cal = Calendar.getInstance();
156: cal.setTime(new Date(pNode.executeAt));
157:
158: if ((pNode.count - 1) >= 1) {
159: pNode.setCount(pNode.count - 1);
160: pNode.setExecutionTime(pNode.executeAt + pNode.interval);
161: } else {
162: return null;
163: }
164: return pNode;
165: }
166:
167: /**
168: * Runs the perturbations as scheduled based on the
169: * scenario time.
170: * @return Returns the number of milliseconds until the next perturbation should be run.
171: */
172: private synchronized long runPerturbations() {
173: long minDiff = Long.MAX_VALUE;
174: //long now = System.currentTimeMillis();
175: long now;
176: Vector p;
177:
178: p = getPerturbations();
179: for (int i = 0; i < p.size();) {
180: PerturbationNode pjob = (PerturbationNode) p.elementAt(i);
181:
182: now = ((pjob.subscriber).getClient()).currentTimeMillis();
183:
184: if (pjob.executeAt <= now) {
185: if (threadPool_ != null) {
186: ReusableThread pt = new ReusableThread(threadPool_);
187: pt.setRunnable(pjob.job);
188: pt.setDaemon(false);
189: pt.start();
190:
191: if (updatePerturbation(pjob) == null) {
192: deletePerturbation(pjob);
193: }
194: } else {
195: System.out
196: .println("\n<<<PerturbationPlugin..."
197: + pjob.threadId
198: + ">>> ERROR::There are no available threads to "
199: + "run the remaining perturbations");
200: }
201: } else {
202: long diff = pjob.executeAt - now;
203: minDiff = Math.min(diff, minDiff);
204: i++;
205: }
206: }
207: return minDiff;
208: }
209:
210: /**
211: * Cancels the perturbation.
212: */
213: public void cancel(PerturbationNode job) {
214: deletePerturbation(job);
215: }
216:
217: /**
218: * Starts and runs the Perturbation Scheduling thread.
219: */
220: public synchronized void run() {
221: System.out
222: .println("\n<<<PerturbationPlugin>>> Starting Scheduler..........");
223: while (true) {
224: long waitTime = runPerturbations();
225: try {
226: System.out.println("\n<<<PerturbationPlugin>>> "
227: + "Next Perturbation Scheduled to begin in "
228: + (waitTime / 1000) + " seconds ");
229: wait(waitTime);
230: } catch (Exception e) {
231: }
232: }
233: }
234:
235: //
236: // The following methods are not cuurently in use by the PerturbationPlugin.
237: //
238:
239: public void execute(PerturbationNode jobNode) {
240: executeIn(jobNode, jobNode.executeAt);
241: }
242:
243: public void executeIn(PerturbationNode jobNode, long millis) {
244: executeInAndRepeat(jobNode, millis, 1000, ONCE);
245: }
246:
247: public void executeInAndRepeat(PerturbationNode jobNode,
248: long millis, long repeat) {
249: executeInAndRepeat(jobNode, millis, repeat, FOREVER);
250: }
251:
252: public void executeInAndRepeat(PerturbationNode jobNode,
253: long millis, long repeat, int count) {
254: Date when = new Date(System.currentTimeMillis() + millis);
255: executeAtAndRepeat(jobNode, when, repeat, count);
256: }
257:
258: public void executeAt(PerturbationNode jobNode, Date when) {
259: executeAtAndRepeat(jobNode, when, 1000, ONCE);
260: }
261:
262: public void executeAtAndRepeat(PerturbationNode jobNode, Date when,
263: long repeat) {
264: executeAtAndRepeat(jobNode, when, repeat, FOREVER);
265: }
266:
267: public void executeAtAndRepeat(PerturbationNode jobNode, Date when,
268: long repeat, int count) {
269: jobNode.setExecutionTime(when.getTime());
270: jobNode.setInterval(repeat);
271: jobNode.setCount(count);
272: addPerturbation(jobNode);
273: }
274:
275: public void executeAtNextDOW(PerturbationNode jobNode, Date when,
276: int DOW) {
277: Calendar target = Calendar.getInstance();
278: target.setTime(when);
279: while (target.get(Calendar.DAY_OF_WEEK) != DOW)
280: target.add(Calendar.DATE, 1);
281: executeAt(jobNode, target.getTime());
282: }
283:
284: public void configureBackup(PerturbationNode job) {
285: Calendar now = Calendar.getInstance();
286: executeAtNextDOW(job, now.getTime(), Calendar.SUNDAY);
287: }
288: }
|