001: /*--------------------------------------------------------------------------
002: * <copyright>
003: *
004: * Copyright 1999-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: * --------------------------------------------------------------------------*/
026: package org.cougaar.glm.plugins;
027:
028: import org.cougaar.core.agent.service.alarm.Alarm;
029: import org.cougaar.core.blackboard.IncrementalSubscription;
030: import org.cougaar.core.mts.MessageAddress;
031: import org.cougaar.core.service.LoggingService;
032: import org.cougaar.glm.ldm.asset.Organization;
033: import org.cougaar.planning.ldm.PlanningFactory;
034: import org.cougaar.planning.ldm.plan.Expansion;
035: import org.cougaar.planning.ldm.plan.NewTask;
036: import org.cougaar.planning.ldm.plan.NewWorkflow;
037: import org.cougaar.planning.ldm.plan.PlanElement;
038: import org.cougaar.planning.ldm.plan.Relationship;
039: import org.cougaar.planning.ldm.plan.RelationshipSchedule;
040: import org.cougaar.planning.ldm.plan.Task;
041: import org.cougaar.planning.plugin.legacy.PluginDelegate;
042: import org.cougaar.planning.plugin.legacy.SimplePlugin;
043: import org.cougaar.util.UnaryPredicate;
044:
045: import java.util.Collection;
046: import java.util.Enumeration;
047: import java.util.HashSet;
048: import java.util.Hashtable;
049: import java.util.Iterator;
050: import java.util.Set;
051: import java.util.Vector;
052:
053: /**
054: * Defines common functions described in SimplePlugin. The plugin is decorated with the proper BasicProcessor at run
055: * time by the PluginDecorator.
056: * @see PluginDecorator
057: * @see BasicProcessor
058: */
059: public abstract class DecorationPlugin extends SimplePlugin {
060:
061: /**
062: * a vector of BasicProcessors for this plugin
063: */
064: protected Vector taskProcessors_;
065: /**
066: * this plugin's Organization cluster identifier
067: */
068: protected MessageAddress clusterId_;
069: protected boolean configured_ = false;
070: protected Organization myOrganization_ = null;
071: private IncrementalSubscription selfOrganizations_;
072: protected String className_;
073: protected Hashtable myParams_ = new Hashtable();
074: public int transCount_ = 0;
075: protected Vector plugInSubscriptions_ = new Vector();
076: protected Hashtable processorSubscriptions_ = new Hashtable();
077: protected boolean invoke_;
078: public final String SUPPLYTYPES = "SUPPLYTYPES";
079:
080: private HashSet consumers_ = null;
081: public org.cougaar.planning.ldm.plan.Role consumerRole_ = null;
082: private Set seenConsumers_ = new HashSet();
083: private Alarm customerAlarm_ = null;
084: /**
085: * Set this for additional delay after all customers have been seen
086: */
087: protected long extraCustomerDelay_ = 0L;
088: protected String myOrgName_;
089: private boolean allConsumersSeen_ = false;
090: private int selfConsumer = 0;
091: public static final String NEW_CUSTOMER_WAIT_PROP = "org.cougaar.glm.plugins.inventory.newCustomerWait";
092: public static final long NEW_CUSTOMER_WAIT_DEFAULT = 120000L;
093: public static long NEW_CUSTOMER_WAIT = Long.getLong(
094: NEW_CUSTOMER_WAIT_PROP, NEW_CUSTOMER_WAIT_DEFAULT)
095: .longValue();
096: protected LoggingService logger;
097:
098: public void setLoggingService(LoggingService loggingService) {
099: logger = loggingService;
100: }
101:
102: public DecorationPlugin() {
103: super ();
104: className_ = this .getClass().getName();
105: int indx = className_.lastIndexOf(".");
106: if (indx > -1) {
107: className_ = className_.substring(indx + 1);
108: }
109: taskProcessors_ = new Vector();
110: }
111:
112: private static UnaryPredicate orgsPredicate_ = new UnaryPredicate() {
113: public boolean execute(Object o) {
114: if (o instanceof Organization) {
115: return ((Organization) o).isSelf();
116: }
117: return false;
118: }
119: };
120:
121: /**
122: * Called by the PluginDecorator to set the task processor.
123: */
124: void addTaskProcessor(BasicProcessor task_processor) {
125: if (logger.isDebugEnabled()) {
126: logger.debug("Adding task processor "
127: + task_processor.getClass().toString());
128: }
129: taskProcessors_.addElement(task_processor);
130: }
131:
132: /**
133: * Must be called after the plugin has been initialized and loaded
134: */
135: private void configure() {
136: if (configured_) {
137: if (selfOrganizations_.getAddedList().hasMoreElements()) {
138: // We should handle changes in our org role and update processors accordingly.
139: if (logger.isErrorEnabled()) {
140: logger.error("orgs changed after decoration");
141: }
142: }
143: return;
144: }
145:
146: // look for this organization
147: if (myOrganization_ == null) {
148: Enumeration new_orgs = selfOrganizations_.elements();
149: if (new_orgs.hasMoreElements()) {
150: myOrganization_ = (Organization) new_orgs.nextElement();
151: myOrgName_ = myOrganization_.getItemIdentificationPG()
152: .getItemIdentification();
153: }
154: }
155:
156: if (myOrganization_ == null)
157: return;
158: // found the organization
159: // Grab plugin parameters for the decorator
160: readParameters();
161: decoratePlugin();
162: configured_ = true;
163: }
164:
165: private void reconfigure() {
166: configured_ = false;
167: configure();
168: }
169:
170: private int getConsumerCount() {
171: if (consumers_ == null) {
172: if (consumerRole_ == null) {
173: if (logger.isDebugEnabled()) {
174: logger.debug(" null consumerRole_");
175: }
176: return selfConsumer;
177: }
178: consumers_ = new HashSet();
179: RelationshipSchedule rs = myOrganization_
180: .getRelationshipSchedule();
181: Collection consumerRelations_ = rs
182: .getMatchingRelationships(consumerRole_);
183: Iterator consumerRelationIterator = consumerRelations_
184: .iterator();
185: /* Count consumers using a set in case of time-phased relationships */
186: if (logger.isDebugEnabled()) {
187: logger.debug(myOrgName_ + " adding consumers of "
188: + consumerRole_);
189: }
190: while (consumerRelationIterator.hasNext()) {
191: String customer = ((Organization) ((Relationship) consumerRelationIterator
192: .next()).getA()).getClusterPG()
193: .getMessageAddress().toString();
194: if (logger.isDebugEnabled()) {
195: logger.debug(myOrgName_ + " adding " + customer);
196: }
197: consumers_.add(customer);
198: }
199: }
200: return consumers_.size() + selfConsumer;
201: }
202:
203: public void recordCustomer(String consumer) {
204: if (allConsumersSeen_)
205: return;
206: if (seenConsumers_.add(consumer)) {
207: if (consumer.equals(myOrgName_))
208: selfConsumer = 1;
209: if (logger.isDebugEnabled()) {
210: logger.debug("####### Seeing new consumer: " + consumer
211: + " at supplier: ");
212: }
213: if (customerAlarm_ != null) {
214: /* Cancel current alarm */
215: customerAlarm_.cancel();
216: customerAlarm_ = null;
217: }
218: int consumerCount = getConsumerCount();
219: if (seenConsumers_.size() >= consumerCount) {
220: if (extraCustomerDelay_ > 0L) {
221: customerAlarm_ = wakeAfterRealTime(extraCustomerDelay_);
222: }
223: allConsumersSeen_ = true;
224: if (logger.isDebugEnabled()) {
225: logger.debug("####### All " + seenConsumers_.size()
226: + " of " + consumerCount
227: + " consumers seen at supplier: ");
228: }
229: } else {
230: if (logger.isDebugEnabled()) {
231: logger.debug("####### Only "
232: + seenConsumers_.size() + " of "
233: + consumerCount
234: + " consumers seen at supplier: ");
235: }
236: customerAlarm_ = wakeAfterRealTime(NEW_CUSTOMER_WAIT);
237: }
238: } else {
239: // System.out.println("####### Seeing old consumer: "+consumer+" at supplier: "+
240: // myOrganization_.getClusterPG().getMessageAddress().toString());
241: }
242: }
243:
244: /* Called when MaintainInventory tasks are rescinded. */
245: public void clearRecordedCustomers() {
246: allConsumersSeen_ = false;
247: selfConsumer = 0;
248: seenConsumers_.clear();
249: if (logger.isDebugEnabled()) {
250: logger
251: .debug("####### Removing seen consumers: at supplier: ");
252:
253: }
254: if (customerAlarm_ != null) {
255: /* Cancel current alarm */
256: customerAlarm_.cancel();
257: customerAlarm_ = null;
258: }
259: }
260:
261: public boolean hasSeenAllConsumers() {
262: if (customerAlarm_ != null) {
263: if (customerAlarm_.hasExpired()) {
264: customerAlarm_ = null;
265: if (!allConsumersSeen_) {
266: allConsumersSeen_ = true;
267: if (logger.isDebugEnabled()) {
268: logger
269: .debug("####### Tired of waiting at supplier: ");
270: }
271: } else {
272: if (logger.isDebugEnabled()) {
273: logger
274: .debug("####### Finished waiting at supplier: ");
275: }
276: }
277: } else {
278: return false; // Still waiting
279: }
280: }
281: return allConsumersSeen_;
282: }
283:
284: protected abstract void decoratePlugin();
285:
286: protected void setupSubscriptions() {
287: clusterId_ = getMessageAddress();
288: selfOrganizations_ = (IncrementalSubscription) subscribe(orgsPredicate_);
289: monitorPluginSubscription(selfOrganizations_);
290:
291: if (didRehydrate()) {
292: reconfigure();
293: }
294:
295: }
296:
297: public void monitorSubscription(BasicProcessor bp,
298: IncrementalSubscription subscript) {
299: Vector subscriptions = (Vector) processorSubscriptions_.get(bp);
300: if (subscriptions == null) {
301: subscriptions = new Vector();
302: processorSubscriptions_.put(bp, subscriptions);
303: }
304: subscriptions.add(subscript);
305: }
306:
307: public void monitorPluginSubscription(
308: IncrementalSubscription subscript) {
309: plugInSubscriptions_.add(subscript);
310: }
311:
312: public boolean isSubscriptionChanged(IncrementalSubscription sub) {
313: return (sub.getChangedList().hasMoreElements()
314: || sub.getAddedList().hasMoreElements() || sub
315: .getRemovedList().hasMoreElements());
316: }
317:
318: public Object subscriptionChangedObject(IncrementalSubscription sub) {
319: if (sub.getChangedList().hasMoreElements()) {
320: return sub.getChangedList().nextElement();
321: } else if (sub.getAddedList().hasMoreElements()) {
322: return sub.getAddedList().nextElement();
323: } else if (sub.getRemovedList().hasMoreElements()) {
324: return sub.getRemovedList().nextElement();
325: } else {
326: return null;
327: }
328: }
329:
330: public boolean areSubscriptionsChanged() {
331: if (areSubscriptionsChanged(plugInSubscriptions_)) {
332: return true;
333: }
334: Enumeration en = processorSubscriptions_.elements();
335: Vector subscriptions;
336: while (en.hasMoreElements()) {
337: subscriptions = (Vector) en.nextElement();
338: if (areSubscriptionsChanged(subscriptions)) {
339: return true;
340: }
341: }
342: return false;
343: }
344:
345: public boolean areSubscriptionsChanged(Vector subscriptions) {
346: Enumeration en = subscriptions.elements();
347: while (en.hasMoreElements()) {
348: if (isSubscriptionChanged((IncrementalSubscription) en
349: .nextElement())) {
350: return true;
351: }
352: }
353: return false;
354: }
355:
356: /**
357: * Invokes all of the processors used to decorate this plugin. The first time execute() is invoked, it configures the
358: * plugin by setting the task processor, and unsubscribing to 'self' (subscribed in setSubscriptions()). subclasses
359: * need to call runProcessors()!
360: */
361: public void execute() {
362: if (!areSubscriptionsChanged() && !wasAwakened()) {
363: if (logger.isDebugEnabled()) {
364: logger
365: .debug(" execute without timer or subscription change !!!!");
366: }
367: invoke_ = false;
368: return;
369: }
370: invoke_ = true;
371: transCount_ = transCount_ + 1;
372:
373: // Configures the task processor list when my organization is added
374: // to the list.
375: configure();
376: }
377:
378: protected void runProcessors() {
379: // Invoke each of the task processors.
380: BasicProcessor tp;
381: Vector subscriptions;
382: Enumeration e = taskProcessors_.elements();
383: boolean runProcessor = false;
384: if (areSubscriptionsChanged(plugInSubscriptions_)
385: || wasAwakened()) {
386: // wake all processors
387: while (e.hasMoreElements()) {
388: tp = (BasicProcessor) e.nextElement();
389: if (areSubscriptionsChangedPrint(plugInSubscriptions_)) {
390: //System.out.println("#### Plugin subscriptions caused running of processor: "+tp+"####");
391: tp.update();
392: runProcessor = true;
393: } else if (wasAwakened()) {
394: //System.out.println("#### Alarm caused running of processor: "+tp+"####");
395: tp.update();
396:
397: // subscriptions = (Vector)processorSubscriptions_.get(tp);
398: // if (subscriptions != null) {
399: // if (areSubscriptionsChangedPrint(subscriptions)) {
400: // System.out.println("#### Running processor: "+tp+"####");
401: // tp.update();
402: // runProcessor = true;
403: // }
404: // }
405: } else {
406: //System.out.println("#### Running processor: "+tp+" with no subscriptions ####");
407: tp.update();
408: }
409:
410: }
411: } else {
412: // wake processors iff their subscriptions changed
413: while (e.hasMoreElements()) {
414: tp = (BasicProcessor) e.nextElement();
415: subscriptions = (Vector) processorSubscriptions_
416: .get(tp);
417: if (subscriptions != null) {
418: if (areSubscriptionsChangedPrint(subscriptions)) {
419: //System.out.println("#### Running processor: "+tp+"####");
420: tp.update();
421: runProcessor = true;
422: }
423: }
424: }
425: }
426: if (false && !runProcessor) {
427: if (logger.isDebugEnabled()) {
428: logger.debug("NO PROCESSOR RUN!!!");
429: }
430: }
431: }
432:
433: public boolean areSubscriptionsChangedPrint(Vector subscriptions) {
434: Enumeration en = subscriptions.elements();
435: while (en.hasMoreElements()) {
436: IncrementalSubscription is = (IncrementalSubscription) en
437: .nextElement();
438: if (isSubscriptionChanged(is)) {
439: //System.out.println("####### Subscription changed: "+subscriptionChangedObject(is));
440: return true;
441: }
442: }
443: return false;
444: }
445:
446: public boolean publishAddObject(Object obj) {
447: publishAdd(obj);
448: return true; // hack
449: }
450:
451: public void publishRemoveFromExpansion(Task subtask) {
452: NewWorkflow wf = (NewWorkflow) subtask.getWorkflow();
453: publishRemove(subtask);
454: if (wf == null)
455: return; // Already removed
456: wf.removeTask(subtask);
457: }
458:
459: public void publishAddToExpansion(Task parent, Task subtask) {
460: // Publish new task
461: publishAddObject(subtask);
462:
463: PlanElement pe = parent.getPlanElement();
464: Expansion expansion;
465: NewWorkflow wf;
466: ((NewTask) subtask).setParentTask(parent);
467: // Task has not been expanded, create an expansion
468: if (pe == null) {
469: PlanningFactory factory = getMyDelegate().getFactory();
470: // Create workflow
471: wf = (NewWorkflow) factory.newWorkflow();
472: wf.setParentTask(parent);
473: wf.setIsPropagatingToSubtasks(true);
474: wf.addTask(subtask);
475: ((NewTask) subtask).setWorkflow(wf);
476: // Build Expansion
477: expansion = factory.createExpansion(parent.getPlan(),
478: parent, wf, null);
479: // Publish Expansion
480: publishAdd(expansion);
481: }
482: // Task already has expansion, add task to the workflow and publish the change
483: else if (pe instanceof Expansion) {
484: expansion = (Expansion) pe;
485: wf = (NewWorkflow) expansion.getWorkflow();
486: wf.addTask(subtask);
487: ((NewTask) subtask).setWorkflow(wf);
488: publishChange(expansion);
489: } else {
490: if (logger.isDebugEnabled()) {
491: logger
492: .debug("publishAddToExpansion: problem pe not Expansion? "
493: + pe);
494: }
495: }
496: }
497:
498: public Alarm wakeAfterDelay(int delay) {
499: return wakeAfterRealTime(delay);
500: }
501:
502: public PluginDelegate getMyDelegate() {
503: return this .getDelegate();
504: }
505:
506: public boolean rehydrated() {
507: return didRehydrate();
508: }
509:
510: protected void readParameters() {
511: Vector p = getParameters();
512:
513: if (p.isEmpty()) {
514: if (logger.isDebugEnabled()) {
515: logger
516: .debug("getParameters(), Using default decorator");
517: }
518: // no parameters to read so our work here is done
519: return;
520: }
521: for (Enumeration e = p.elements(); e.hasMoreElements();) {
522: String s = (String) e.nextElement();
523: if (s.charAt(0) == '+') {
524: myParams_.put(new String(s.substring(1)), new Boolean(
525: true));
526: if (logger.isDebugEnabled()) {
527: logger.debug("readParameters(), adding "
528: + s.substring(1));
529: }
530: } else if (s.charAt(0) == '-') {
531: myParams_.put(new String(s.substring(1)), new Boolean(
532: false));
533: if (logger.isDebugEnabled()) {
534: logger.debug("readParameters(), removing "
535: + s.substring(1));
536: }
537: } else if (s.indexOf('=') != -1) {
538: int i = s.indexOf('=');
539: String key = new String(s.substring(0, i));
540: String value = new String(s
541: .substring(i + 1, s.length()));
542: myParams_.put(key.trim(), value.trim());
543: } else {
544: Vector types = (Vector) myParams_.get(SUPPLYTYPES);
545: if (types == null) {
546: types = new Vector();
547: myParams_.put(SUPPLYTYPES, types);
548: }
549: types.add(s);
550: if (logger.isDebugEnabled()) {
551: logger
552: .debug("readParameters(), assuming a demand type parameter: "
553: + s);
554: }
555: }
556: }
557: }
558:
559: public Object getParam(String key) {
560: return myParams_.get(key);
561: }
562: }
|