001: /*
002: * <copyright>
003: *
004: * Copyright 2001-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.logistics.plugin.trans;
027:
028: import java.util.*;
029:
030: import org.cougaar.glm.ldm.asset.Organization;
031:
032: import org.cougaar.lib.callback.UTILFilterCallback;
033: import org.cougaar.lib.callback.UTILGenericListener;
034: import org.cougaar.lib.callback.UTILWorkflowCallback;
035:
036: import org.cougaar.lib.vishnu.client.XMLizer;
037: import org.cougaar.lib.vishnu.client.custom.CustomVishnuAllocatorPlugin;
038:
039: import org.cougaar.logistics.ldm.Constants;
040:
041: import org.cougaar.planning.ldm.asset.Asset;
042:
043: import org.cougaar.planning.ldm.plan.Relationship;
044: import org.cougaar.planning.ldm.plan.RelationshipType;
045: import org.cougaar.planning.ldm.plan.Role;
046: import org.cougaar.planning.ldm.plan.Task;
047:
048: import org.cougaar.util.TimeSpan;
049: import org.cougaar.util.UnaryPredicate;
050:
051: import org.w3c.dom.Document;
052:
053: public class TranscomVishnuPlugin extends CustomVishnuAllocatorPlugin {
054: protected Set reportedIDs = new HashSet();
055: protected Set expectedIDs = new HashSet();
056: protected TranscomDataXMLize transcomDataXMLizer;
057: protected List delayedTasks = new ArrayList();
058: protected long waitTime = 10000; // millis
059:
060: public void localSetup() {
061: super .localSetup();
062:
063: addExpectedProviders();
064:
065: createXMLizer(getRunDirectly()); // we need one of these for getOrganizationRole
066: }
067:
068: protected void addExpectedProviders() {
069: try {
070: boolean needsAir = !getMyParams().hasParam(
071: "doesNotNeedAirTransportProvider");
072: String GLOBAL_AIR_ID = (getMyParams()
073: .hasParam("GlobalAirRole")) ? getMyParams()
074: .getStringParam("GlobalAirRole")
075: : "AirTransportationProvider";
076:
077: String GLOBAL_SEA_ID = (getMyParams()
078: .hasParam("GlobalSeaRole")) ? getMyParams()
079: .getStringParam("GlobalSeaRole")
080: : "SeaTransportationProvider";
081:
082: String NULL_ASSET_ID = (getMyParams()
083: .hasParam("NullAssetRole")) ? getMyParams()
084: .getStringParam("NullAssetRole") : "NullNomen";
085:
086: if (needsAir)
087: expectedIDs.add(GLOBAL_AIR_ID);
088:
089: expectedIDs.add(GLOBAL_SEA_ID);
090:
091: // the NULL asset isn't really important - just if we want to allocate tasks
092: // to it, which in modern societies, we never do - it's just a hack if we
093: // want something bogus to show up on a TPFDD.
094:
095: // expectedIDs.add (NULL_ASSET_ID);
096: } catch (Exception e) {
097: error("got really unexpected exception " + e);
098: }
099: }
100:
101: /**
102: * Is the task interesting to the plugin? This is the inner-most part of
103: * the predicate. <br>
104: * By default, it ignores tasks produced from this plugin <br>
105: * If you redefine this, it's good to call this using super.
106: *
107: * @param t - the task begin checked
108: * @see org.cougaar.lib.callback.UTILGenericListener#interestingTask
109: */
110: public boolean interestingTask(Task t) {
111: if (!super .interestingTask(t))
112: return false;
113: boolean hasTransport = t.getVerb().equals(
114: Constants.Verb.TRANSPORT);
115:
116: if (logger.isDebugEnabled())
117: logger
118: .debug("found "
119: + t.getUID()
120: + (hasTransport ? " interesting"
121: : " uninteresting"));
122:
123: return hasTransport;
124: }
125:
126: /**
127: * Sort through assets to make sure we have the proper subordinates.
128: *
129: * @param newAssets new assets found in the container
130: */
131: public void handleChangedAssets(Enumeration newAssets) {
132: handleNewAssets(newAssets);
133: }
134:
135: public void handleNewAssets(Enumeration newAssets) {
136: super .handleNewAssets(newAssets);
137: if (isInfoEnabled()) {
138: info("handleNewAssets - got called with "
139: + myNewAssets.size() + " assets.");
140: }
141:
142: for (Iterator iter = myNewAssets.iterator(); iter.hasNext();) {
143: String name = "";
144:
145: Asset asset = (Asset) iter.next();
146: boolean isOrg = false;
147: try {
148: if (asset instanceof Organization) {
149: name = getOrganizationRole(asset);
150: if (isInfoEnabled()) {
151: info("handleNewAssets - received subordinate org : "
152: + asset + "'s name is " + name);
153: }
154: } else {
155: name = asset.getTypeIdentificationPG()
156: .getNomenclature();
157: if (isDebugEnabled()) {
158: debug("handleNewAssets - " + asset
159: + " is NOT an org.");
160: }
161: }
162: } catch (Exception e) {
163: if (isWarnEnabled()) {
164: logger.warn("handleNewAssets - " + asset
165: + " was strange - ", e);
166: }
167: }
168:
169: if (name != null) {
170: if (expectedIDs.contains(name)) {
171: if (!reportedIDs.contains(name)) {
172: if (logger.isInfoEnabled())
173: logger.info(getName() + " - expected "
174: + name + " has reported.");
175: reportedIDs.add(name);
176: }
177: } else {
178: if (logger.isInfoEnabled())
179: logger.info("ignoring id " + name);
180: }
181: }
182: }
183: }
184:
185: /** calls TranscomDataXMLize */
186: protected String getOrganizationRole(Asset asset) {
187: return transcomDataXMLizer.getOrganizationRole(asset);
188: }
189:
190: /** use the TranscomDataXMLize XMLizer */
191: protected XMLizer createXMLizer(boolean direct) {
192: return (transcomDataXMLizer = new TranscomDataXMLize(direct,
193: logger, expectedIDs));
194: }
195:
196: protected Collection getAllAssets() {
197: if (!allNecessaryAssetsReported()) {
198: if (isWarnEnabled()) {
199: warn("Trying to find subordinates, despite not seeing the following in added lists...");
200:
201: reportMissingAssets();
202: }
203:
204: // send subscription contents through again...
205: Collection assets = getAssetCallback().getSubscription()
206: .getCollection();
207: List orgs = new ArrayList();
208:
209: for (Iterator iter = assets.iterator(); iter.hasNext();) {
210: Asset asset = (Asset) iter.next();
211: if (asset instanceof Organization) {
212: String name = getOrganizationRole(asset);
213: if (expectedIDs.contains(name)
214: && !reportedIDs.contains(name)) {
215: orgs.add(asset);
216: }
217: }
218: }
219:
220: if (!orgs.isEmpty()) {
221: // send them through again as though they appeared on the added list
222: if (isWarnEnabled()) {
223: warn("Found the missing subord(s) : " + orgs);
224: }
225:
226: handleNewAssets(Collections.enumeration(orgs));
227: }
228: }
229:
230: return super .getAllAssets();
231: }
232:
233: /**
234: * Overridden to provide check for missing assets. Calls super first.
235: *
236: * @param stuffToSend - initially the list of tasks to send to scheduler
237: * @param objectFormatDoc - optional object format used by data xmlizers
238: * to determine types for fields when running directly
239: */
240: protected void prepareData(List stuffToSend,
241: Document objectFormatDoc) {
242: super .prepareData(stuffToSend, objectFormatDoc);
243:
244: // localDidRehydrate - handleNewAssets is not called if we were rehydrated,
245: // so we would otherwise report spurious error.
246: if (!localDidRehydrate && !allNecessaryAssetsReported())
247: reportMissingAssets();
248: }
249:
250: public void handleAssignment(
251: org.cougaar.planning.ldm.plan.Task task, Asset asset,
252: Date start, Date end, Date setupStart, Date wrapupEnd,
253: String contribs, String taskText) {
254: Date best = prefHelper.getBestDate(task);
255: if (end.getTime() > best.getTime()
256: && end.getTime() < (best.getTime() + 1000l)) {
257: end = best;
258: }
259: super .handleAssignment(task, asset, start, end, setupStart,
260: wrapupEnd, contribs, taskText);
261: }
262:
263: protected boolean allNecessaryAssetsReported() {
264: return (reportedIDs.size() == expectedIDs.size());
265: }
266:
267: protected void reportMissingAssets() {
268: for (Iterator iter = expectedIDs.iterator(); iter.hasNext();) {
269: Object key = iter.next();
270: if (!reportedIDs.contains(key)) {
271: error(" - ERROR - missing expected asset with role "
272: + key);
273: }
274: }
275: }
276:
277: /**
278: * Calls processTasks if any delayed tasks left to process.
279: */
280: protected void execute() {
281: super .execute();
282:
283: // processTasks is ordinarily only called when tasks have accumulated
284: // in the buffer and the buffer dispatch condition has been met.
285: //
286: // This ensures that if there are delayed tasks, they are re-examined,
287: // whether there's something in the buffer or not.
288: //
289: // Fix for bug #13455
290:
291: if (!delayedTasks.isEmpty()) {
292: processTasks(new ArrayList());
293: }
294: }
295:
296: /**
297: * If necessary subordinates have not reported yet, accumulates tasks into
298: * a delayedTasks list, and asks to be kicked again in 10 seconds, by which
299: * time hopefully the subordinates have reported.
300: *
301: * Solves the race condition between tasks showing up and subordinates showing up.
302: * @param tasks to process
303: */
304: public void processTasks(List tasks) {
305: if (!allNecessaryAssetsReported()) { // if need subordinates aren't there yet, way 10 seconds
306: getAllAssets();
307: delayedTasks.addAll(tasks);
308:
309: if (logger.isInfoEnabled()) {
310: logger
311: .info(getName()
312: + " - necessary subords have not reported, so waiting "
313: + waitTime + " millis to process "
314: + delayedTasks.size() + " tasks.");
315: reportMissingAssets();
316: }
317:
318: examineBufferAgainIn(waitTime); // wait 10 seconds and check again
319: } else { // ok, all subords are here, lets go!
320: if (logger.isInfoEnabled()) {
321: logger
322: .info(getName()
323: + " - all necessary subords have reported, so processing "
324: + tasks.size() + " tasks.");
325: }
326:
327: tasks.addAll(delayedTasks);
328: delayedTasks.clear();
329: super.processTasks(tasks);
330: }
331: }
332: }
|