001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.lib.filter;
028:
029: import java.util.List;
030:
031: import org.cougaar.core.agent.service.alarm.Alarm;
032: import org.cougaar.core.component.ServiceRevokedEvent;
033: import org.cougaar.core.component.ServiceRevokedListener;
034: import org.cougaar.core.service.AgentIdentificationService;
035: import org.cougaar.core.service.QuiescenceReportService;
036: import org.cougaar.core.service.ThreadService;
037: import org.cougaar.core.thread.Schedulable;
038: import org.cougaar.lib.callback.UTILFilterCallback;
039: import org.cougaar.lib.callback.UTILGenericListener;
040: import org.cougaar.lib.util.UTILPreference;
041: import org.cougaar.lib.util.UTILVerify;
042: import org.cougaar.planning.ldm.plan.Task;
043:
044: /**
045: * <pre>
046: * Example implementation of a plugin that buffers tasks
047: * until a certain threshold is reached. processTasks is then called.
048: * allocators, expanders, and aggregators should all be derived from
049: * this instead of PluginAdapter; we can then turn on buffering at a higher
050: * level by changing the default for buffering from 1. (which is = to no
051: * buffering at all)
052: *
053: * Abstract because these are undefined:
054: *
055: * createThreadCallback -- half of the determination of the flavor of a plugin
056: * (Allocator, Expander, Aggregator). The other half is declaring them listeners
057: * of the right type.
058: * processTasks -- different for each plugin flavor
059: *
060: * </pre>
061: */
062: public abstract class UTILBufferingPluginAdapter extends
063: UTILPluginAdapter implements UTILBufferingPlugin {
064:
065: protected QuiescenceReportService qService;
066: protected AgentIdentificationService agentIDService;
067:
068: /**
069: * <pre>
070: * Start up the task buffering thread.
071: *
072: * Note that localSetup is called AFTER setupFilters, so
073: * getBufferingThread will not return null;
074: *
075: * IT IS CRUCIAL that this gets called with super () if this
076: * is overridden! Otherwise, plugin will be effectively dead.
077: * </pre>
078: */
079: public void localSetup() {
080: super .localSetup();
081:
082: // This is in setupSubscriptions -- no need to force to run again there....
083: // wakeUp ();
084: if (logger.isDebugEnabled())
085: logger.debug("Skipping call to wakeUp");
086:
087: verify = new UTILVerify(logger);
088: prefHelper = new UTILPreference(logger);
089: }
090:
091: /**
092: * Implemented for UTILBufferingPlugin
093: *
094: * OVERRIDE to specialize tasks you find interesting.
095: * @param t Task to check for interest
096: * @return boolean true if task is interesting
097: */
098: public boolean interestingTask(Task t) {
099: return true;
100: }
101:
102: /**
103: * <pre>
104: * Examines task to see if task looks like what the plugin
105: * expects it to look like.
106: *
107: * This is plugin-dependent. For example, the
108: * planning factor for unloading a ship is 2 days, but
109: * if the task's time window is 1 day, the task is
110: * not "well formed." Duration is a common test, but others
111: * could also be a good idea...
112: *
113: * This is an explicit contract with the plugin
114: * that feeds this plugins tasks, governing which tasks
115: * are possible for this plugin to handle.
116: *
117: * </pre>
118: * @param t Task to check for consistency
119: * @return true if task is OK
120: */
121: public boolean isTaskWellFormed(Task t) {
122: return true;
123: }
124:
125: protected void reportIllFormedTask(Task t) {
126: if (!verify.isTaskTimingCorrect(t))
127: info(getName() + ".reportIllFormedTask - task " + t
128: + " has " + verify.reportTimingError(t));
129: else if (!verify.hasDirectObject(t)) {
130: info(getName() + ".reportIllFormedTask - task "
131: + t.getUID() + " is missing direct object.");
132: } else if (!verify.hasStartPreference(t)) {
133: info(getName() + ".reportIllFormedTask - task "
134: + t.getUID() + " is start time preference.");
135: } else if (!verify.hasEndPreference(t)) {
136: info(getName() + ".reportIllFormedTask - task "
137: + t.getUID() + " is end time preference.");
138: } else {
139: info(getName() + ".reportIllFormedTask - task " + t
140: + " was ill formed. (start "
141: + prefHelper.getReadyAt(t) + " e "
142: + prefHelper.getEarlyDate(t) + " b "
143: + prefHelper.getBestDate(t) + " l "
144: + prefHelper.getLateDate(t) + ")");
145: }
146: }
147:
148: /**
149: * Note that setupFilters is called BEFORE localSetup, so
150: * getBufferingThread will not return null;
151: */
152: public void setupFilters() {
153: super .setupFilters();
154:
155: bufferingThread = createBufferingThread();
156:
157: if (isDebugEnabled())
158: debug(getName() + " creating buffering thread "
159: + bufferingThread);
160:
161: UTILFilterCallback threadCallback = createThreadCallback((UTILGenericListener) bufferingThread);
162:
163: addFilter(threadCallback);
164: }
165:
166: /**
167: * Provide the callback that is paired with the buffering thread, which is a
168: * listener. The buffering thread is the listener to the callback
169: *
170: * @return a FilterCallback with the buffering thread as its listener
171: */
172: protected abstract UTILFilterCallback createThreadCallback(
173: UTILGenericListener listener);
174:
175: /**
176: * The listening buffering thread communicates with the plugin
177: * across the UTILBufferingPlugin interface.
178: *
179: * This plugin is NOT a workflow listener.
180: *
181: * @return UTILListeningBufferingThread with this as the BufferingPlugin
182: * @see UTILBufferingPlugin
183: * @see UTILListeningBufferingThread
184: */
185: protected UTILBufferingThread createBufferingThread() {
186: return new UTILListeningBufferingThread(this , logger);
187: }
188:
189: /** accessor */
190: protected UTILBufferingThread getBufferingThread() {
191: return bufferingThread;
192: }
193:
194: /**
195: * Implemented for UTILBufferingPlugin
196: *
197: * Deal with the tasks that we have accumulated.
198: *
199: * ABSTRACT, so derived plugins must implement this.
200: *
201: * @param tasks List of tasks to handle
202: */
203: public abstract void processTasks(List tasks);
204:
205: public void resume() {
206: super .resume();
207:
208: if (isDebugEnabled())
209: debug("In resume about to call wakeUp");
210:
211: wakeUp();
212: }
213:
214: public void wakeUp() {
215: if (isInfoEnabled())
216: info(getName() + " wakeUp called.");
217:
218: examineBufferAgainIn(10l);
219: }
220:
221: public final void setQuiescenceReportService(
222: QuiescenceReportService qService) {
223: this .qService = qService;
224: }
225:
226: /** Buffering runnable wants to restart later */
227: public void examineBufferAgainIn(long delayTime) {
228: if (agentIDService == null) {
229: agentIDService = (AgentIdentificationService) getServiceBroker()
230: .getService(this , AgentIdentificationService.class,
231: new ServiceRevokedListener() {
232: public void serviceRevoked(
233: ServiceRevokedEvent re) {
234: debug("Agent id service revoked.");
235: }
236: });
237: qService.setAgentIdentificationService(agentIDService);
238: }
239:
240: // tell the world that we are busy
241: qService.clearQuiescentState();
242:
243: if (isInfoEnabled())
244: logger.info(getName() + " asking to be restarted in "
245: + delayTime); //, new Throwable());
246:
247: if (currentAlarm != null)
248: currentAlarm.cancel();
249:
250: long absTime = System.currentTimeMillis() + delayTime;
251: alarmService.addRealTimeAlarm(currentAlarm = new PluginAlarm(
252: absTime));
253: }
254:
255: /** lifted from PluginAdapter -- can't reuse it from there, sigh... */
256: public class PluginAlarm implements Alarm {
257: private long expiresAt;
258: private boolean expired = false;
259:
260: public PluginAlarm(long expirationTime) {
261: expiresAt = expirationTime;
262: }
263:
264: public long getExpirationTime() {
265: return expiresAt;
266: }
267:
268: public synchronized void expire() {
269: if (!expired) {
270: expired = true;
271: {
272: org.cougaar.core.service.BlackboardService bbs = getBlackboardService();
273: if (bbs != null)
274: bbs.signalClientActivity();
275: }
276: }
277: }
278:
279: public boolean hasExpired() {
280: return expired;
281: }
282:
283: public synchronized boolean cancel() {
284: boolean was = expired;
285: expired = true;
286: return was;
287: }
288:
289: public String toString() {
290: return "<PluginAlarm " + expiresAt
291: + (expired ? "(Expired) " : " ") + "for "
292: + UTILBufferingPluginAdapter.this .toString() + ">";
293: }
294: }
295:
296: /**
297: * Called every time one of the filterCallback subscriptions
298: * change.
299: *
300: * What the plugin does in response to a changed subscription.
301: *
302: * Directs the filterCallback with the changed subscription
303: * to react to the change in some way.
304: */
305: protected void execute() {
306: if (isDebugEnabled())
307: debug(getName()
308: + " : cycle called (a subscription changed)");
309:
310: super .execute();
311:
312: if (currentAlarm != null && currentAlarm.hasExpired()) {
313: if (isDebugEnabled())
314: debug("An alarm expired");
315: bufferingThread.run();
316:
317: if (!bufferingThread.anyTasksLeft()) {
318: // tell the world that we have completed
319: if (isInfoEnabled())
320: info(getName() + " now quiescent.");
321:
322: // FIXME: Could there be multiple alarms outstanding?
323: // Shouldnt I clear quiescence based on the alarm firing, not some task count?
324: qService.setQuiescentState();
325: }
326: }
327: }
328:
329: protected UTILBufferingThread bufferingThread = null;
330:
331: protected Schedulable schedulable;
332:
333: protected UTILVerify verify;
334: protected UTILPreference prefHelper;
335: protected ThreadService threadService;
336: protected PluginAlarm currentAlarm;
337: }
|