001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file ../GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.vdl.router;
016:
017: import org.griphyn.vdl.classes.*;
018: import org.griphyn.vdl.util.*;
019: import org.griphyn.vdl.dax.*;
020:
021: // java.util.List clashes with org.griphyn.vdl.classes.List
022: import java.util.HashMap;
023: import java.util.Vector;
024: import java.util.HashSet;
025: import java.util.Collection;
026: import java.util.Set;
027: import java.util.TreeSet;
028: import java.util.Iterator;
029: import java.util.ArrayList;
030: import java.util.HashSet;
031: import java.util.Random;
032: import java.io.*;
033:
034: /**
035: * This class stores state when constructing the DAG for output. It
036: * is expected that for each DAG generation, one instance of this
037: * class performs the state tracking. The class is tightly coupled
038: * to the class that performs the routing.
039: *
040: * @author Jens-S. Vöckler
041: * @author Yong Zhao
042: * @version $Revision: 50 $
043: * @see Route
044: */
045: public class BookKeeper {
046: /**
047: * This contains the set of visited derivations.
048: */
049: private HashMap m_visited;
050:
051: /**
052: * This map tracks the set of parents for each child derivation.
053: * Each value item in the map is a set.
054: */
055: private HashMap m_parent;
056:
057: /**
058: * The content of the abstract DAG for XML (DAX) is created after
059: * the alternatives are found. Maybe in a later version, it will
060: * be created in parallel with adding derivations?
061: */
062: private ADAG m_dax;
063:
064: /**
065: * Unfortunately, for this release, we need to map the shortID of
066: * a derivation to some number that is acceptable to NMTOKEN, ID and
067: * IDREF.
068: */
069: private SequenceMapping m_mapping;
070:
071: /**
072: * The <code>Profile</code> elements in nested compound TR can
073: * be equally nested.
074: */
075: private ListStack m_profileStack;
076:
077: /**
078: * The names of compound transformations that we got through. This
079: * data will also be added to the job description once a job is added
080: * to the book-keeper.
081: */
082: private ArrayList m_transformations;
083:
084: /**
085: * The temporary files must be unique on a per-workflow basis.
086: */
087: private HashSet m_tempfiles;
088:
089: /**
090: * To create temporary filenames, we need an entropy source.
091: */
092: private Random m_rng;
093:
094: /**
095: * Default ctor.
096: */
097: public BookKeeper() {
098: this .m_parent = new HashMap();
099: this .m_visited = new HashMap();
100: this .m_dax = new ADAG();
101: this .m_mapping = new SequenceMapping();
102: this .m_profileStack = new ListStack();
103: this .m_transformations = new ArrayList();
104: this .m_tempfiles = new HashSet();
105: this .m_rng = new Random();
106: }
107:
108: /**
109: * Accessor: push a vector of profiles down the profile stack.
110: * @param profiles is a list of profiles, may be empty.
111: * @see #popProfile()
112: */
113: public void pushProfile(java.util.List profiles) {
114: this .m_profileStack.push(profiles);
115: }
116:
117: /**
118: * Accessor: pop a vector of profiles from the profile stack.
119: * @return the last active vector of profiles, may be empty.
120: * @see #pushProfile( java.util.List )
121: */
122: public java.util.List popProfile() {
123: return this .m_profileStack.pop();
124: }
125:
126: /**
127: * Accessor: obtains all profiles on the profile stack.
128: * @return all stacked profiles in one structure, may be empty.
129: */
130: public java.util.List getAllProfiles() {
131: return this .m_profileStack.flatten();
132: }
133:
134: /**
135: * Accessor: push the FQDI of a transformation onto a stack.
136: * @param fqdi is the fully-qualified definition identifier.
137: * @see #popTransformation()
138: */
139: public void pushTransformation(String fqdi) {
140: this .m_transformations.add(fqdi);
141: }
142:
143: /**
144: * Accessor: pop an FQDI from the stack of transformations.
145: * @return the last FQDI pushed, may be empty for an empty stack.
146: * @see #pushTransformation( String )
147: */
148: public String popTransformation() {
149: int pos = this .m_transformations.size() - 1;
150: if (pos >= 0)
151: return (String) this .m_transformations.remove(pos);
152: else
153: return null;
154: }
155:
156: /**
157: * Accessor: obtains all transformations that were pushed as space
158: * separated string.
159: *
160: * @return all transformation names, may be empty.
161: * @see #addJob( Job )
162: */
163: public String getAllTransformations() {
164: StringBuffer result = new StringBuffer();
165:
166: for (Iterator i = this .m_transformations.iterator(); i
167: .hasNext();) {
168: result.append((String) i.next());
169: if (i.hasNext())
170: result.append(' ');
171: }
172:
173: return result.toString();
174: }
175:
176: /**
177: * Accessor: Returns the constructed internal DAX keeper.
178: *
179: * @return the abstract DAX description. Note that this may be empty!
180: */
181: public ADAG getDAX(String name) {
182: this .m_dax.setName(name);
183: return this .m_dax;
184: }
185:
186: /**
187: * Accessor: Appends a job definition to the DAX structure.
188: */
189: public boolean addJob(Job job) {
190: return this .m_dax.addJob(job);
191: }
192:
193: /**
194: * This method updates the "cursor" position with a new derivation.
195: * The cursor helps tracking derivations while a DAG is being produced.
196: *
197: * @param dv is the new current derivation that the cursor will be set to.
198: * @see #getCurrent()
199: */
200: public void setCurrent(HasPass dv) {
201: // assign a new cursor
202: Logging.instance().log("state", 2,
203: "NOOP: setting cursor to " + dv.identify());
204: }
205:
206: /**
207: * Obtains the current cursor position.
208: * @return the derivation that the cursor is located at.
209: * @see #setCurrent( HasPass )
210: */
211: public HasPass getCurrent() {
212: throw new RuntimeException("method not implemented");
213: }
214:
215: /**
216: * Maps a DV identification to a name that can be put into the XML
217: * datatypes NMTOKEN, ID and IDREF. The identification used to be
218: * the DV's short ID, but recent changes use the full ID.
219: *
220: * @param id is the derivation identifier
221: * @return an XML-compatible job id
222: */
223: public String mapJob(String id) {
224: String s = this .m_mapping.forward(id);
225: Logging.instance().log("state", 3, "mapJob(" + id + ") = " + s);
226: return s;
227: }
228:
229: /**
230: * Obtains an existing mapping of a DV indentification to a job id.
231: * The identification used to be the DV's short ID, but recent changes
232: * use the full ID.
233: *
234: * @param id is the derivation identifier
235: * @return a job identifier, or null if no such mapping exists.
236: */
237: public String jobOf(String id) {
238: String s = this .m_mapping.get(id);
239: Logging.instance().log("state", 3,
240: "jobOf(" + id + ") = " + (s == null ? "(null)" : s));
241: return s;
242: }
243:
244: /**
245: * Accessor: Obtains the level of a given job from the DAX structure.
246: * @param jobid is the job identifier
247: * @return the level of the job, or -1 in case of error (no such job).
248: */
249: private int getJobLevel(String jobid) {
250: Job j = this .m_dax.getJob(jobid);
251: return (j == null ? -1 : j.getLevel());
252: }
253:
254: /**
255: * Accessor: Recursively adjusts job level backwards.
256: *
257: * @see org.griphyn.vdl.dax.ADAG#adjustLevels( String, int )
258: */
259: private int adjustJobLevels(String startjob, int distance) {
260: return this .m_dax.adjustLevels(startjob, distance);
261: }
262:
263: /**
264: * Adds a parent set for the given element to the state
265: * @param current is the current derivation to add parents for
266: * @param parents are the parents, any number, to add for the given element
267: * @return true, if the parent was not know before or the currelem is empty,
268: * false, if the parent was added previously.
269: */
270: public boolean addParent(HasPass current, Set parents) {
271: String id = current.identify();
272:
273: if (parents.size() == 0) {
274: // nothing to do
275: Logging.instance().log("state", 4, "no parents for " + id);
276: return false;
277: }
278:
279: if (!this .m_parent.containsKey(id)) {
280: Logging.instance().log("state", 4,
281: "adding new parent space for " + id);
282: this .m_parent.put(id, new TreeSet());
283: }
284:
285: // check for modifications
286: boolean modified = ((Set) this .m_parent.get(id))
287: .addAll(parents);
288: if (modified) {
289: Logging.instance().log(
290: "state",
291: 4,
292: "CHILD " + id + " PARENT "
293: + this .m_parent.get(id).toString());
294:
295: // add to parent/child DAX relationship // OLD
296: // NEW: adjust job level count as necessary
297: String jobid = mapJob(id); // OLD
298: int level = getJobLevel(jobid);
299: for (Iterator i = parents.iterator(); i.hasNext();) { // OLD
300: String pid = mapJob((String) i.next()); // OLD
301: this .m_dax.addChild(jobid, pid); // OLD
302: int plevel = getJobLevel(pid);
303: if (level != -1 && plevel != -1 && level >= plevel) {
304: int adjustment = level - plevel + 1;
305: Logging.instance().log(
306: "state",
307: 3,
308: "adjusting levels starting at " + pid
309: + " by " + adjustment);
310:
311: adjustJobLevels(pid, adjustment);
312: }
313: }
314: }
315:
316: return modified;
317: }
318:
319: /**
320: * This method marks a derivation as visited in the set of visited nodes.
321: *
322: * @param current is the caller to add to the visited node set.
323: * @param real is the value to add for the given caller.
324: * @return <code>true</code>, if the caller was previously unknown
325: * to the visited set.
326: */
327: public boolean addVisited(HasPass current, Set real) {
328: String id = current.identify();
329: boolean exists = this .m_visited.containsKey(id);
330: if (exists) {
331: Set temp = (Set) this .m_visited.get(id);
332: Logging.instance().log("state", 2,
333: "already visited node " + id + " keeping " + temp);
334: } else {
335: this .m_visited.put(id, real);
336: Logging.instance().log("state", 2,
337: "adding visited node " + id + " with " + real);
338: }
339: return exists;
340: }
341:
342: /**
343: * Checks if a node was previously visited. The visited nodes have to
344: * be tracked in any kind of breadth first and depth first search.
345: *
346: * @param dv is the derivation to check, if it was visited before.
347: * @return true, if the derivation was already visited previously.
348: */
349: public boolean wasVisited(HasPass dv) {
350: String id = dv.identify();
351: boolean result = this .m_visited.containsKey(id);
352: Logging.instance().log("state", 5,
353: "wasVisited(" + id + ") = " + result);
354: return result;
355: }
356:
357: public Set getVisited(HasPass dv) {
358: String id = dv.identify();
359: Set result = (Set) this .m_visited.get(id);
360: if (result == null)
361: result = new TreeSet();
362: Logging.instance().log("state", 5,
363: "getVisited(" + id + ") = " + result);
364: return result;
365: }
366:
367: /**
368: * This method helps to track the input and output files. From this
369: * information, the input, intermediary and output files of the
370: * complete DAG can be constructed. This method does allow for inout.
371: *
372: * @param lfnset is a set of LFN instances, which encapsulate their
373: * respective linkage.
374: * @see java.util.AbstractCollection#addAll( java.util.Collection )
375: */
376: public void addFilenames(Collection lfnset) {
377: if (lfnset == null)
378: return; // nothing to do
379:
380: for (Iterator i = lfnset.iterator(); i.hasNext();) {
381: LFN lfn = (LFN) i.next();
382: String name = lfn.getFilename();
383: if (lfn.getLink() == LFN.INPUT
384: || lfn.getLink() == LFN.INOUT) {
385: this .m_dax.addFilename(name, true, lfn.getTemporary(),
386: lfn.getDontRegister(), lfn.getDontTransfer());
387: Logging.instance().log("state", 5,
388: "adding input filename " + name);
389: }
390: if (lfn.getLink() == LFN.OUTPUT
391: || lfn.getLink() == LFN.INOUT) {
392: this .m_dax.addFilename(name, false, lfn.getTemporary(),
393: lfn.getDontRegister(), lfn.getDontTransfer());
394: Logging.instance().log("state", 5,
395: "adding output filename " + name);
396: }
397: }
398: }
399:
400: /**
401: * String constant describing the six X.
402: */
403: static final String XXXXXX = "XXXXXX";
404:
405: /**
406: * Where to draw filename characters from.
407: */
408: private static final String filenamechars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz";
409:
410: /**
411: * Creates a unique temporary filename. The new name is registered
412: * locally to ensure uniqueness. A string of multiple capitol X, at
413: * least six, is replaced with some random factor.
414: *
415: * @param hint is a filename hint.
416: * @param suffix is the suffix for the filename.
417: * @return a somewhat unique filename - for this workflow only.
418: */
419: public String createTempName(String hint, String suffix) {
420: String result = null;
421:
422: // sanity checks
423: if (hint == null || hint.length() == 0)
424: hint = XXXXXX;
425: if (suffix == null)
426: suffix = ".tmp";
427:
428: // where are the X
429: int where = hint.lastIndexOf(XXXXXX);
430: if (where == -1) {
431: hint += XXXXXX;
432: where = hint.lastIndexOf(XXXXXX);
433: }
434:
435: // create a changable buffer
436: StringBuffer sb = new StringBuffer(hint);
437:
438: // find end of substitution area
439: int last = where;
440: while (last < sb.length() && sb.charAt(last) == 'X')
441: ++last;
442:
443: // substitute until done (FIXME: may be a forever in rare cases)
444: for (;;) {
445: // create and substitute
446: for (int i = where; i < last; ++i) {
447: sb.setCharAt(i, filenamechars.charAt(m_rng
448: .nextInt(filenamechars.length())));
449: }
450:
451: // check uniqueness
452: if (suffix.length() > 0)
453: result = sb.toString() + suffix;
454: if (!m_tempfiles.contains(result)) {
455: m_tempfiles.add(result);
456: break;
457: }
458: }
459:
460: return result;
461: }
462:
463: /**
464: * Detects valid results in the ADAG as opposed to an empty shell.
465: *
466: * @return true, if the ADAG is an empty shell.
467: */
468: public boolean isEmpty() {
469: Iterator i = this .m_dax.iterateJob();
470: return (!i.hasNext());
471: }
472:
473: /**
474: * This method is a possibly more memory efficient version of
475: * constructing a DAG.
476: * @param stream is a generic stream to put textual results onto.
477: */
478: public void toString(Writer stream) throws IOException {
479: this .m_dax.toString(stream);
480: }
481:
482: /**
483: * dumps the state of this object into human readable format.
484: */
485: public String toString() {
486: return this .m_dax.toString();
487: }
488:
489: /**
490: * This method is a possibly more memory efficient version of
491: * constructing a DAX.
492: * @param stream is a generic stream to put XML results onto.
493: * @param indent is the initial indentation level.
494: * @param namespace is an optional XML namespace.
495: */
496: public void toXML(Writer stream, String indent, String namespace)
497: throws IOException {
498: this .m_dax.toXML(stream, indent, namespace);
499: }
500:
501: /**
502: * This method is a possibly more memory efficient version of
503: * constructing a DAX.
504: * @param stream is a generic stream to put XML results onto.
505: * @param indent is the initial indentation level.
506: */
507: public void toXML(Writer stream, String indent) throws IOException {
508: this .m_dax.toXML(stream, indent, null);
509: }
510:
511: /**
512: * Dumps the state of this object into machine readable XML.
513: * @param indent is the initial indentation level.
514: * @param namespace is an optional XML namespace.
515: */
516: public String toXML(String indent, String namespace) {
517: return this.m_dax.toXML(indent, namespace);
518: }
519: }
|