0001: package org.griphyn.cPlanner.toolkit;
0002:
0003: import java.io.BufferedReader;
0004: import java.io.StringReader;
0005: import java.io.FileReader;
0006: import java.net.URL;
0007: import java.util.LinkedList;
0008: import java.io.File;
0009: import java.util.List;
0010: import java.util.ArrayList;
0011: import java.util.Iterator;
0012: import java.util.Map;
0013: import java.util.HashMap;
0014:
0015: import javax.xml.parsers.DocumentBuilderFactory;
0016: import javax.xml.parsers.DocumentBuilder;
0017:
0018: import org.pasoa.common.Constants;
0019: import org.pasoa.pstructure.Record;
0020: import org.pasoa.pstructure.ActorStatePAssertion;
0021: import org.pasoa.pstructure.GlobalPAssertionKey;
0022: import org.pasoa.pstructure.InteractionKey;
0023: import org.pasoa.pstructure.InteractionPAssertion;
0024: import org.pasoa.pstructure.ObjectID;
0025: import org.pasoa.pstructure.RelationshipPAssertion;
0026: import org.pasoa.storeclient.ClientLib;
0027: import org.pasoa.util.httpsoap.WSAddressEndpoint;
0028: import org.w3c.dom.Document;
0029: import org.w3c.dom.Element;
0030: import org.w3c.dom.NodeList;
0031: import org.w3c.dom.Node;
0032:
0033: import org.xml.sax.InputSource;
0034: import java.io.StringWriter;
0035: import java.io.IOException;
0036: import java.io.Reader;
0037: import org.xml.sax.InputSource;
0038:
0039: public class PasoaProvenanceClient {
0040:
0041: /** change this to connect to the preserv server **/
0042: public static String URL = "http://localhost:8080/preserv-1.0";
0043: public static String XMLHEADER = "<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>";
0044: public static String CONDOR = "www.cs.wisc.edu/condor";
0045: public long filecount = 0;
0046: public static String documentationStyle = "http://www.pasoa.org/schemas/pegasus";
0047: public ClientLib clientLib = null;
0048: public URL provenanceStore = null;
0049: public String jobname = null;
0050: public String wf_label = null;
0051: public String wf_planned_time = null;
0052: public String transformation = null;
0053: public Element docelement = null;
0054: public Element daxelement = null;
0055: // public List input=null;
0056: // public List output=null;
0057: public List parents = null;
0058: public List children = null;
0059: public Map input = null;
0060: public Map output = null;
0061:
0062: public PasoaProvenanceClient(String url) {
0063: clientLib = new ClientLib();
0064: try {
0065: provenanceStore = new URL(url + "/record");
0066: } catch (Exception e) {
0067: System.err.println("Bad Bad Bad url");
0068: }
0069: }
0070:
0071: public PasoaProvenanceClient() {
0072: clientLib = new ClientLib();
0073: try {
0074: provenanceStore = new URL(URL + "/record");
0075: } catch (Exception e) {
0076: System.err.println("Bad Bad Bad url");
0077: }
0078:
0079: }
0080:
0081: public static void main(String[] args) throws Exception {
0082:
0083: PasoaProvenanceClient cle = null;
0084: String jobfile = null;
0085: String daxfile = null;
0086: String dagfile = null;
0087: String url = null;
0088: if (args.length < 3) {
0089: System.err.println("Usage: Client daxfile dagfile outfile");
0090: // System.err.println("Usage: Client daxfile dagfile preservurl");
0091: System.exit(1);
0092:
0093: } else if (args.length == 3) {
0094: jobfile = args[2];
0095: daxfile = args[0];
0096: dagfile = args[1];
0097: cle = new PasoaProvenanceClient();
0098:
0099: }
0100: /*}else {
0101: jobfile=args[0];
0102: daxfile=args[0];
0103: dagfile=args[2];
0104: url=args[3];
0105: cle = new PasoaProvenanceClient(url);
0106:
0107: }*/
0108: try {
0109: cle.jobname = (new File(jobfile)).getName().split("\\.out")[0];
0110: System.out.println("Processing job --- " + cle.jobname);
0111: cle.parseKickstartRecord(jobfile);
0112: cle.parseDag(dagfile);
0113: List newlist = new ArrayList();
0114: if (cle.parents != null && !cle.parents.isEmpty()) {
0115: System.out.println("Adding parents " + cle.parents);
0116: newlist.addAll(cle.parents);
0117: }
0118: if (cle.children != null && !cle.children.isEmpty()) {
0119: System.out.println("Adding children " + cle.children);
0120: newlist.addAll(cle.children);
0121: }
0122: System.out.println("Adding job " + cle.jobname);
0123:
0124: newlist.add(cle.jobname);
0125: System.out.println("Job List is " + newlist);
0126: cle.parseFiles(newlist);
0127: // cle.parseDaxFile(daxfile,newlist);
0128: // cle.parseInput();
0129: System.out.println("Inputs == " + cle.input);
0130: System.out.println("Outputs == " + cle.output);
0131:
0132: if (cle.jobname.startsWith("rc_tx")
0133: || (cle.jobname.startsWith("new_rc_tx"))) {
0134: InteractionKey ik = cle.transferInvocationInteraction();
0135: cle.transferCompletionInteraction(ik);
0136: } else if (cle.jobname.startsWith("new_rc_register")) {
0137: InteractionKey ik = cle.registerInvocationInteraction();
0138: cle.registerCompletionInteraction(ik);
0139: } else if (cle.jobname.endsWith("cdir")) {
0140: //write this handler
0141: } else if (cle.jobname.startsWith("cln_")) {
0142: //write this handler
0143: } else if (cle.jobname.endsWith("concat")) {
0144: //write this handler
0145: } else {
0146: InteractionKey ik = cle.jobInvocationInteraction();
0147: cle.jobCompletionInteraction(ik);
0148: }
0149: } catch (Exception e) {
0150: e.printStackTrace();
0151: }
0152:
0153: }
0154:
0155: private void parseDag(String file) throws Exception {
0156: BufferedReader bf = new BufferedReader(new FileReader(file));
0157: String line = null;
0158: while ((line = bf.readLine()) != null) {
0159: String[] list = null;
0160: if (line.startsWith("PARENT")) {
0161: list = line.split(" ");
0162: }
0163: if (list != null) {
0164: if (list[1].equalsIgnoreCase(jobname)) {
0165: if (children == null) {
0166: children = new ArrayList();
0167: }
0168: children.add(list[3]);
0169: }
0170: if (list[3].equalsIgnoreCase(jobname)) {
0171: if (parents == null) {
0172: parents = new ArrayList();
0173: }
0174: parents.add(list[1]);
0175: }
0176: }
0177: }
0178: bf.close();
0179: }
0180:
0181: private void parseKickstartRecord(String file) throws Exception {
0182: DocumentBuilderFactory dbf = DocumentBuilderFactory
0183: .newInstance();
0184: DocumentBuilder db = dbf.newDocumentBuilder();
0185: List records = extractToMemory(new File(file));
0186: if (records != null) {
0187: for (Iterator i = records.iterator(); i.hasNext();) {
0188:
0189: Document msgDoc = db.parse(new InputSource(
0190: new StringReader((String) i.next())));
0191: docelement = msgDoc.getDocumentElement();
0192: transformation = docelement
0193: .getAttribute("transformation");
0194: wf_label = docelement.getAttribute("wf-label");
0195: wf_planned_time = docelement.getAttribute("wf-stamp");
0196:
0197: }
0198: }
0199: }
0200:
0201: public List extractToMemory(java.io.File input) throws Exception {
0202: List result = new ArrayList();
0203: StringWriter out = null;
0204: // open the files
0205: int p1, p2, state = 0;
0206: try {
0207: BufferedReader in = new BufferedReader(
0208: new FileReader(input));
0209: out = new StringWriter(4096);
0210: String line = null;
0211: while ((line = in.readLine()) != null) {
0212: if ((state & 1) == 0) {
0213: // try to copy the XML line in any case
0214: if ((p1 = line.indexOf("<?xml")) > -1)
0215: if ((p2 = line.indexOf("?>", p1)) > -1) {
0216: // out.write( line, p1, p2+2 );
0217: System.out.println("state=" + state
0218: + ", seen <?xml ...?>");
0219: }
0220: // start state with the correct root element
0221: if ((p1 = line.indexOf("<invocation")) > -1) {
0222: if (p1 > 0)
0223: line = line.substring(p1);
0224: System.out.println("state=" + state
0225: + ", seen <invocation>");
0226: out.write(XMLHEADER);
0227: ++state;
0228: }
0229: }
0230: if ((state & 1) == 1) {
0231: out.write(line);
0232: if ((p1 = line.indexOf("</invocation>")) > -1) {
0233: System.out.println("state=" + state
0234: + ", seen </invocation>");
0235: ++state;
0236:
0237: out.flush();
0238: out.close();
0239: result.add(out.toString());
0240: out = new StringWriter(4096);
0241: }
0242: }
0243: }
0244:
0245: in.close();
0246: out.close();
0247: } catch (IOException ioe) {
0248: throw new Exception("While copying " + input.getPath()
0249: + " into temp. file: " + ioe.getMessage());
0250: }
0251:
0252: // some sanity checks
0253: if (state == 0)
0254: throw new Exception("File " + input.getPath()
0255: + " does not contain invocation records,"
0256: + " assuming failure");
0257: if ((state & 1) == 1)
0258: throw new Exception("File " + input.getPath()
0259: + " contains an incomplete invocation record,"
0260: + " assuming failure");
0261:
0262: // done
0263: return result;
0264: }
0265:
0266: private void parseFiles(List jobs) throws Exception {
0267: File infile = null;
0268: File outfile = null;
0269: List ilist = null;
0270: List temp = new ArrayList(jobs);
0271: for (Iterator i = temp.iterator(); i.hasNext();) {
0272: String job = (String) i.next();
0273: if (job.startsWith("rc_tx_")) {
0274: //this is for stagein jobs
0275: outfile = new File(job + ".out.lof");
0276: if (outfile.exists() && outfile.canRead()
0277: && outfile.length() != 0) {
0278: try {
0279: BufferedReader in = new BufferedReader(
0280: new FileReader(outfile));
0281: String str;
0282: while ((str = in.readLine()) != null) {
0283: if (output == null) {
0284: output = new HashMap();
0285: }
0286: if (!output.containsKey(job)) {
0287: output.put(job, new ArrayList());
0288: }
0289: ilist = (List) output.get(job);
0290: ilist.add(str);
0291: }
0292: in.close();
0293: } catch (IOException e) {
0294: }
0295: }
0296:
0297: } else if (job.startsWith("new_rc_tx_")) {
0298: //this is for stageout/inter tx jobs
0299: outfile = new File(job + ".out.lof");
0300: if (outfile.exists() && outfile.canRead()
0301: && outfile.length() != 0) {
0302: try {
0303: BufferedReader in = new BufferedReader(
0304: new FileReader(outfile));
0305: String str;
0306: while ((str = in.readLine()) != null) {
0307: if (input == null) {
0308: input = new HashMap();
0309: }
0310: if (!input.containsKey(job)) {
0311: input.put(job, new ArrayList());
0312: }
0313: ilist = (List) input.get(job);
0314: ilist.add(str);
0315: }
0316: in.close();
0317: } catch (IOException e) {
0318: }
0319: }
0320:
0321: } else if (job.startsWith("inter_tx_")) {
0322: outfile = new File(job + ".out.lof");
0323: if (outfile.exists() && outfile.canRead()
0324: && outfile.length() != 0) {
0325: try {
0326: BufferedReader in = new BufferedReader(
0327: new FileReader(outfile));
0328: String str;
0329: while ((str = in.readLine()) != null) {
0330: if (output == null) {
0331: output = new HashMap();
0332: }
0333: if (!output.containsKey(job)) {
0334: output.put(job, new ArrayList());
0335: }
0336: ilist = (List) output.get(job);
0337: ilist.add(str);
0338: if (input == null) {
0339: input = new HashMap();
0340: }
0341: if (!input.containsKey(job)) {
0342: input.put(job, new ArrayList());
0343: }
0344: ilist = (List) input.get(job);
0345: ilist.add(str);
0346: }
0347: in.close();
0348: } catch (IOException e) {
0349: }
0350: }
0351:
0352: } else if (job.startsWith("new_rc_register")) {
0353: BufferedReader bf = new BufferedReader(new FileReader(
0354: new File(job + ".in")));
0355: String line = null;
0356: while ((line = bf.readLine()) != null) {
0357: String lfn = null;
0358: lfn = line.split(" ")[0];
0359: if (input == null) {
0360: input = new HashMap();
0361: }
0362: if (!input.containsKey(job)) {
0363: input.put(job, new ArrayList());
0364: }
0365: ilist = (List) input.get(job);
0366: ilist.add(lfn);
0367: }
0368: bf.close();
0369: } else if (job.startsWith("cln_")) {
0370: //this is for cleanup jobs
0371: infile = new File(job + ".in.lof");
0372: if (infile.exists() && infile.canRead()
0373: && infile.length() != 0) {
0374: try {
0375: BufferedReader in = new BufferedReader(
0376: new FileReader(infile));
0377: String str;
0378: while ((str = in.readLine()) != null) {
0379:
0380: if (input == null) {
0381: input = new HashMap();
0382: }
0383: if (!input.containsKey(job)) {
0384: input.put(job, new ArrayList());
0385: }
0386: ilist = (List) input.get(job);
0387: ilist.add(str);
0388: }
0389: in.close();
0390: } catch (IOException e) {
0391: }
0392: }
0393: } else if (!job.endsWith("_cdir")) {
0394: //this is a regular job
0395: outfile = new File(job + ".out.lof");
0396: if (outfile.exists() && outfile.canRead()
0397: && outfile.length() != 0) {
0398: try {
0399: BufferedReader in = new BufferedReader(
0400: new FileReader(outfile));
0401: String str;
0402: while ((str = in.readLine()) != null) {
0403: if (output == null) {
0404: output = new HashMap();
0405: }
0406: if (!output.containsKey(job)) {
0407: output.put(job, new ArrayList());
0408: }
0409: ilist = (List) output.get(job);
0410: ilist.add(str);
0411: }
0412: in.close();
0413: } catch (IOException e) {
0414: }
0415: }
0416:
0417: infile = new File(job + ".in.lof");
0418: if (infile.exists() && infile.canRead()
0419: && infile.length() != 0) {
0420: try {
0421: BufferedReader in = new BufferedReader(
0422: new FileReader(infile));
0423: String str;
0424: while ((str = in.readLine()) != null) {
0425:
0426: if (input == null) {
0427: input = new HashMap();
0428: }
0429: if (!input.containsKey(job)) {
0430: input.put(job, new ArrayList());
0431: }
0432: ilist = (List) input.get(job);
0433: ilist.add(str);
0434: }
0435: in.close();
0436: } catch (IOException e) {
0437: }
0438: }
0439: }
0440: }
0441: }
0442:
0443: private void parseDaxFile(String file, List jobs) throws Exception {
0444: DocumentBuilderFactory dbf = DocumentBuilderFactory
0445: .newInstance();
0446: DocumentBuilder db = dbf.newDocumentBuilder();
0447: Document msgDoc = db.parse(new File(file));
0448: NodeList nlist = msgDoc.getElementsByTagName("job");
0449: List temp = new ArrayList(jobs);
0450: input = new HashMap();
0451: output = new HashMap();
0452:
0453: for (int i = 0; i < nlist.getLength(); i++) {
0454: String tempname = nlist.item(i).getAttributes()
0455: .getNamedItem("name").getNodeValue()
0456: + "_"
0457: + nlist.item(i).getAttributes().getNamedItem("id")
0458: .getNodeValue();
0459: if (temp.contains(tempname)) {
0460: temp.remove(tempname);
0461: NodeList uselist = nlist.item(i).getChildNodes();
0462: for (int j = 0; j < uselist.getLength(); j++) {
0463: if (uselist.item(j).getNodeName().equals("uses")) {
0464: Node n = uselist.item(j).getAttributes()
0465: .getNamedItem("link");
0466: if (n != null) {
0467: List ilist = null;
0468: String fname = uselist.item(j)
0469: .getAttributes().getNamedItem(
0470: "file").getNodeValue();
0471: if (n.getNodeValue().equalsIgnoreCase(
0472: "output")) {
0473:
0474: if (output == null) {
0475: output = new HashMap();
0476: ilist = new ArrayList();
0477: output.put(tempname, ilist);
0478: }
0479: if (!output.containsKey(tempname)) {
0480: output.put(tempname,
0481: new ArrayList());
0482: }
0483: ilist = (List) output.get(tempname);
0484: ilist.add(fname);
0485: } else {
0486: if (input == null) {
0487: input = new HashMap();
0488: }
0489: if (!input.containsKey(tempname)) {
0490: input
0491: .put(tempname,
0492: new ArrayList());
0493: }
0494: ilist = (List) input.get(tempname);
0495: ilist.add(fname);
0496: }
0497: }
0498:
0499: }
0500: }
0501: }
0502: }
0503: }
0504:
0505: private void parseInput() throws Exception {
0506: if (parents != null && !parents.isEmpty()) {
0507: for (Iterator p = parents.iterator(); p.hasNext();) {
0508: String tempjob = (String) p.next();
0509: if (tempjob.startsWith("rc_tx")
0510: || tempjob.startsWith("inter_tx")) {
0511: List ilist = null;
0512: if (output == null) {
0513: output = new HashMap();
0514: }
0515: if (!output.containsKey(tempjob)) {
0516: output.put(tempjob, new ArrayList());
0517: }
0518: ilist = (List) output.get(tempjob);
0519: BufferedReader bf = new BufferedReader(
0520: new FileReader(new File(tempjob + ".in")));
0521: String line = null;
0522: while ((bf.readLine()) != null) {
0523: bf.readLine();
0524: bf.readLine();
0525: line = bf.readLine();
0526: filecount++;
0527: String lfn = line.split("run\\d{4}/")[1];
0528: ilist.add(lfn);
0529: }
0530: bf.close();
0531: }
0532:
0533: }
0534: }
0535: if (children != null && !children.isEmpty()) {
0536: for (Iterator c = children.iterator(); c.hasNext();) {
0537: String tempjob = (String) c.next();
0538: if (tempjob.startsWith("new_rc_tx")
0539: || tempjob.startsWith("inter_tx")) {
0540: List ilist = null;
0541: if (input == null) {
0542: input = new HashMap();
0543:
0544: }
0545: if (!input.containsKey(tempjob)) {
0546: input.put(tempjob, new ArrayList());
0547: }
0548: ilist = (List) input.get(tempjob);
0549: BufferedReader bf = new BufferedReader(
0550: new FileReader(new File(tempjob + ".in")));
0551: String line = null;
0552: while ((bf.readLine()) != null) {
0553:
0554: line = bf.readLine();
0555: filecount++;
0556: String lfn = line.split("run\\d{4}/")[1];
0557: ilist.add(lfn);
0558: bf.readLine();
0559: bf.readLine();
0560: }
0561: bf.close();
0562:
0563: }
0564:
0565: }
0566: }
0567: if (jobname.startsWith("rc_tx")
0568: || jobname.startsWith("new_rc_tx")
0569: || jobname.startsWith("inter_tx")) {
0570: BufferedReader bf = new BufferedReader(new FileReader(
0571: new File(jobname + ".in")));
0572: String line = null;
0573: List ilist = null;
0574: while ((bf.readLine()) != null) {
0575: String lfn = null;
0576: line = bf.readLine();
0577: if (jobname.startsWith("new_rc_tx")
0578: || jobname.startsWith("inter_tx")) {
0579: lfn = line.split("run\\d{4}/")[1];
0580: }
0581: if (input == null) {
0582: input = new HashMap();
0583: }
0584: input.put(jobname, new ArrayList());
0585: if (input.containsKey(jobname)) {
0586: ilist = (List) input.get(jobname);
0587: }
0588: ilist.add(lfn);
0589: bf.readLine();
0590: line = bf.readLine();
0591: if (jobname.startsWith("rc_tx")
0592: || jobname.startsWith("inter_tx")) {
0593: lfn = line.split("run\\d{4}/")[1];
0594:
0595: }
0596: if (output == null) {
0597: output = new HashMap();
0598: }
0599: output.put(jobname, new ArrayList());
0600: if (output.containsKey(jobname)) {
0601: ilist = (List) output.get(jobname);
0602: }
0603: ilist.add(lfn);
0604:
0605: }
0606: bf.close();
0607: }
0608:
0609: if (jobname.startsWith("new_rc_register")) {
0610: BufferedReader bf = new BufferedReader(new FileReader(
0611: new File(jobname + ".in")));
0612: String line = null;
0613: List ilist = null;
0614: while ((line = bf.readLine()) != null) {
0615: String lfn = null;
0616: lfn = line.split(" ")[0];
0617: if (input == null) {
0618: input = new HashMap();
0619: }
0620: input.put(jobname, new ArrayList());
0621: if (input.containsKey(jobname)) {
0622: ilist = (List) input.get(jobname);
0623: }
0624: ilist.add(lfn);
0625: }
0626: bf.close();
0627: }
0628: }
0629:
0630: public InteractionKey jobInvocationInteraction() throws Exception {
0631:
0632: System.out
0633: .println("We now create the job Invocation interaction key");
0634:
0635: // Create addresses for the source and sink of the
0636: // interaction.
0637: WSAddressEndpoint source = new WSAddressEndpoint(CONDOR);
0638:
0639: WSAddressEndpoint sink = new WSAddressEndpoint(jobname);
0640:
0641: // Create an interactionId, this should be unique!
0642:
0643: String interactionId = wf_label + wf_planned_time + jobname;
0644: InteractionKey ik = new InteractionKey(source.getElement(),
0645: sink.getElement(), interactionId);
0646:
0647: System.out.println("Building p-assertions...");
0648:
0649: InteractionPAssertion ipa = createJobInvocationInteractionPAssertion();
0650:
0651: //setting sender type
0652: System.out
0653: .println("We are the sender/client view of the interaction");
0654: String vk = Constants.SENDER_VIEW_TYPE;
0655: System.out.println();
0656:
0657: //set asserter to CONDOR
0658:
0659: WSAddressEndpoint asserter = new WSAddressEndpoint(CONDOR);
0660:
0661: List records = new ArrayList();
0662:
0663: System.out
0664: .println("Creating Record objects for each p-assertion");
0665:
0666: Record recIpa = new Record(ipa, ik, vk, asserter.getElement());
0667: records.add(recIpa);
0668:
0669: //iterate over parents to create multiple rpa's
0670: RelationshipPAssertion rpa = null;
0671: Record recRpa = null;
0672: if (input.containsKey(jobname)) {
0673: List inputs = (List) input.get(jobname);
0674: // for(int i=0; i<inputs.size();i++){
0675: // Iterator j = inputs.iterator();
0676: int i = 0;
0677: for (Iterator j = inputs.iterator(); j.hasNext();) {
0678: String tempfile = (String) j.next();
0679: for (Iterator k = parents.iterator(); k.hasNext();) {
0680: String tempjob = (String) k.next();
0681: List templist = (List) output.get(tempjob);
0682: if (templist != null) {
0683: if (templist.contains(tempfile)) {
0684: i++;
0685: System.out
0686: .println("Parent Relationship *** file="
0687: + tempfile
0688: + " from="
0689: + jobname
0690: + " to="
0691: + tempjob);
0692: recRpa = new Record(
0693: createJobToTransferRelationshipPAssertion(
0694: tempfile, tempjob, i), ik,
0695: vk, asserter.getElement());
0696: records.add(recRpa);
0697: }
0698: }
0699: }
0700:
0701: }
0702: }
0703:
0704: System.out
0705: .println("Recording the p-assertions in provenance store "
0706: + provenanceStore);
0707:
0708: clientLib.record(records.iterator(), provenanceStore, true);
0709:
0710: System.out.println("sender p-assertions recorded");
0711: System.out.println();
0712:
0713: //setting reciever type
0714:
0715: System.out
0716: .println("We are the sender/client view of the interaction");
0717: vk = Constants.RECEIVER_VIEW_TYPE;
0718: System.out.println();
0719:
0720: //set asserter to Job
0721:
0722: asserter = new WSAddressEndpoint(jobname);
0723: recIpa = new Record(ipa, ik, vk, asserter.getElement());
0724:
0725: System.out
0726: .println("Recording the p-assertions in provenance store "
0727: + provenanceStore);
0728:
0729: clientLib.record(recIpa, provenanceStore);
0730:
0731: System.out.println("receiver p-assertions recorded");
0732: System.out.println();
0733:
0734: return ik;
0735:
0736: }
0737:
0738: public void jobCompletionInteraction(
0739: InteractionKey invocationinteractionkey) throws Exception {
0740:
0741: System.out
0742: .print("Creating Completion Interaction Key ....... ");
0743:
0744: // Create addresses for the source and sink of the
0745: // interaction.
0746: // Create an interactionId, this should be unique!
0747: WSAddressEndpoint source = new WSAddressEndpoint(jobname);
0748: WSAddressEndpoint sink = new WSAddressEndpoint(CONDOR);
0749: String interactionId = wf_label + wf_planned_time + jobname;
0750: InteractionKey ik = new InteractionKey(source.getElement(),
0751: sink.getElement(), interactionId);
0752: System.out.println("DONE");
0753:
0754: //setting sender type
0755: String vk = Constants.SENDER_VIEW_TYPE;
0756: //set asserter to be the job
0757: WSAddressEndpoint asserter = new WSAddressEndpoint(jobname);
0758:
0759: System.out.println("Building p-assertions ..... ");
0760: List records = new ArrayList();
0761:
0762: InteractionPAssertion ipa = createJobCompletionInteractionPAssertion();
0763: Record recIpa = new Record(ipa, ik, vk, asserter.getElement());
0764: records.add(recIpa);
0765: //iterate over files to create multiple rpa's
0766: RelationshipPAssertion rpa = null;
0767: Record recRpa = null;
0768: if (output.containsKey(jobname)) {
0769: int count = 0;
0770: for (Iterator i = ((List) output.get(jobname)).iterator(); i
0771: .hasNext();) {
0772: count++;
0773: recRpa = new Record(createJobRelationshipPAssertion(
0774: invocationinteractionkey, (String) i.next(),
0775: count), ik, vk, asserter.getElement());
0776: records.add(recRpa);
0777: }
0778: }
0779: ActorStatePAssertion apa = createActorStatePAssertion(0);
0780:
0781: Record recApa = new Record(apa, ik, vk, asserter.getElement());
0782:
0783: records.add(recApa);
0784: System.out
0785: .print("Recording the sender p-assertions in provenance store ..... ");
0786:
0787: clientLib.record(records.iterator(), provenanceStore, true);
0788:
0789: System.out.println("DONE");
0790:
0791: //setting reciever type
0792: vk = Constants.RECEIVER_VIEW_TYPE;
0793:
0794: //set asserter to CONDOR
0795:
0796: asserter = new WSAddressEndpoint(CONDOR);
0797: recIpa = new Record(ipa, ik, vk, asserter.getElement());
0798:
0799: records = new ArrayList();
0800: records.add(recIpa);
0801: /**
0802: * //iterate over children to create multiple rpa's
0803: * rpa = null;
0804: * recRpa=null;
0805: * List outputs = (List)output.get(jobname);
0806: * // for(int i=0; i<outputs.size();i++){
0807: * // Iterator j = outputs.iterator();
0808: * int i =0;
0809: * for(Iterator j=outputs.iterator();j.hasNext();){
0810: *
0811: * String tempfile=(String)j.next();
0812: * for(Iterator k = children.iterator();k.hasNext();){
0813: * String tempjob=(String)k.next();
0814: * List templist = (List)input.get(tempjob);
0815: * if(templist!=null){
0816: * if(templist.contains(tempfile)){
0817: * i++;
0818: * System.out.println("Child Relationship *** file="+tempfile+" from="+jobname+" to="+tempjob);
0819: *
0820: * recRpa=new Record(createJobToTransferRelationshipPAssertion(tempfile,tempjob,i ),ik,vk,asserter.getElement());
0821: * records.add(recRpa);
0822: * }
0823: * }
0824: * }
0825: *
0826: * }
0827: **/
0828: System.out
0829: .print("Recording the receiver p-assertions in provenance store ..... ");
0830:
0831: clientLib.record(recIpa, provenanceStore);
0832:
0833: System.out.println("Done");
0834:
0835: }
0836:
0837: public InteractionKey transferInvocationInteraction()
0838: throws Exception {
0839:
0840: System.out.print("Creating Invocation Interaction Key ..... ");
0841:
0842: // Create addresses for the source and sink of the
0843: // interaction.
0844: WSAddressEndpoint source = new WSAddressEndpoint(CONDOR);
0845: WSAddressEndpoint sink = new WSAddressEndpoint(jobname);
0846: String interactionId = wf_label + wf_planned_time + jobname;
0847: InteractionKey ik = new InteractionKey(source.getElement(),
0848: sink.getElement(), interactionId);
0849: System.out.println("Done");
0850:
0851: //setting sender type
0852: String vk = Constants.SENDER_VIEW_TYPE;
0853: //set asserter to CONDOR
0854: WSAddressEndpoint asserter = new WSAddressEndpoint(CONDOR);
0855:
0856: System.out.print("Building p-assertions ..... ");
0857: InteractionPAssertion ipa = createTransferInvocationInteractionPAssertion();
0858:
0859: List records = new ArrayList();
0860: Record recIpa = new Record(ipa, ik, vk, asserter.getElement());
0861: records.add(recIpa);
0862: if (!jobname.startsWith("rc_tx")) {
0863: //iterate over parents to create multiple rpa's
0864: RelationshipPAssertion rpa = null;
0865: Record recRpa = null;
0866: List inputs = (List) input.get(jobname);
0867: // for(int i=0; i<inputs.size();i++){
0868: // Iterator j = inputs.iterator();
0869: int i = 0;
0870: for (Iterator j = inputs.iterator(); j.hasNext();) {
0871: String tempfile = (String) j.next();
0872: for (Iterator k = parents.iterator(); k.hasNext();) {
0873: String tempjob = (String) k.next();
0874: List templist = (List) output.get(tempjob);
0875: if (templist != null) {
0876: if (templist.contains(tempfile)) {
0877: i++;
0878: // System.out.println("Parent Relationship *** file="+tempfile+" from="+jobname+" to="+tempjob);
0879: recRpa = new Record(
0880: createJobToTransferRelationshipPAssertion(
0881: tempfile, tempjob, i), ik,
0882: vk, asserter.getElement());
0883: records.add(recRpa);
0884: }
0885: }
0886: }
0887:
0888: }
0889: }
0890: System.out.println("Done");
0891: System.out
0892: .print("Recording the sender p-assertions in provenance store .......... ");
0893: clientLib.record(records.iterator(), provenanceStore, true);
0894: System.out.println("Done");
0895:
0896: //setting reciever type
0897: vk = Constants.RECEIVER_VIEW_TYPE;
0898: //set asserter to job type
0899: asserter = new WSAddressEndpoint(jobname);
0900: //add the interaction P assertion
0901: recIpa = new Record(ipa, ik, vk, asserter.getElement());
0902: System.out
0903: .print("Recording the receiver p-assertions in provenance store ........ ");
0904: clientLib.record(recIpa, provenanceStore);
0905: System.out.println("DONE");
0906: return ik;
0907:
0908: }
0909:
0910: public void transferCompletionInteraction(
0911: InteractionKey invocationinteractionkey) throws Exception {
0912:
0913: System.out
0914: .print("Creating Completion Interaction Key ....... ");
0915: // Create addresses for the source and sink of the
0916: // interaction.
0917: WSAddressEndpoint source = new WSAddressEndpoint(jobname);
0918: WSAddressEndpoint sink = new WSAddressEndpoint(CONDOR);
0919: String interactionId = wf_label + wf_planned_time + jobname;
0920: InteractionKey ik = new InteractionKey(source.getElement(),
0921: sink.getElement(), interactionId);
0922: System.out.println("Done");
0923:
0924: //setting sender type
0925: String vk = Constants.SENDER_VIEW_TYPE;
0926: //set asserter to the job type
0927: WSAddressEndpoint asserter = new WSAddressEndpoint(jobname);
0928:
0929: System.out.print("Building p-assertions ..... ");
0930: List records = new ArrayList();
0931:
0932: InteractionPAssertion ipa = createTransferCompletionInteractionPAssertion();
0933: Record recIpa = new Record(ipa, ik, vk, asserter.getElement());
0934: records.add(recIpa);
0935:
0936: //iterate over files to create multiple rpa's
0937: RelationshipPAssertion rpa = null;
0938: Record recRpa = null;
0939:
0940: //get this file number from the .in file
0941: //simon or paul will change the client.record method to take iterator of assertions instead of iterator of records.
0942: for (int i = 0; i < filecount; i++) {
0943: recRpa = new Record(createTransferRelationshipPAssertion(
0944: invocationinteractionkey, i), ik, vk, asserter
0945: .getElement());
0946: records.add(recRpa);
0947: }
0948:
0949: ActorStatePAssertion apa = createActorStatePAssertion(0);
0950: Record recApa = new Record(apa, ik, vk, asserter.getElement());
0951: records.add(recApa);
0952:
0953: System.out.println("DONE");
0954:
0955: System.out
0956: .print("Recording the sender p-assertions in provenance store ..... ");
0957: clientLib.record(records.iterator(), provenanceStore, true);
0958: System.out.println("Done");
0959:
0960: //setting reciever type
0961:
0962: vk = Constants.RECEIVER_VIEW_TYPE;
0963: //set asserter to CONDOR
0964: asserter = new WSAddressEndpoint(CONDOR);
0965:
0966: //adding the interaction p assertion
0967: recIpa = new Record(ipa, ik, vk, asserter.getElement());
0968: System.out
0969: .print("Recording the receiver p-assertions in provenance store .... ");
0970:
0971: clientLib.record(recIpa, provenanceStore);
0972: System.out.println("DONE");
0973:
0974: }
0975:
0976: private InteractionPAssertion createTransferInvocationInteractionPAssertion()
0977: throws Exception {
0978: // Create an interaction p-assertion
0979: // First we make a local p-assertion id and then
0980: // we make a documentationStyle. In this case we
0981: // call it verbatium.
0982: //
0983: // In most cases, you'll be grabing the messageBody from the message
0984: // being sent between parties. So a SOAP message, or a CORBA message.
0985: // With this example we'll just use a hard coded message body.
0986:
0987: String localPAssertionId = "1";
0988:
0989: // this message content will be obtained by parsing the transfer input files <jobid.in> and obtaining the source urls
0990:
0991: BufferedReader bf = new BufferedReader(new FileReader(new File(
0992: jobname + ".in")));
0993: String line = null;
0994: StringBuffer message = new StringBuffer(
0995: "<transfer xmlns=\"http://pegasus.isi.edu/schema/pasoa/content/transfer\">");
0996: while ((bf.readLine()) != null) {
0997: line = bf.readLine();
0998: filecount++;
0999: if (!jobname.startsWith("new_rc_tx")) {
1000: message.append("<filename>" + line + "</filename>");
1001: } else {
1002: String lfn = line.split("run\\d{4}/")[1];
1003:
1004: message.append("<filename file=\"" + lfn + "\">" + line
1005: + "</filename>");
1006: }
1007: bf.readLine();
1008: bf.readLine();
1009: }
1010: bf.close();
1011: message.append("</transfer>");
1012:
1013: // Convert it into a DOM Element
1014: DocumentBuilderFactory dbf = DocumentBuilderFactory
1015: .newInstance();
1016: DocumentBuilder db = dbf.newDocumentBuilder();
1017: Document msgDoc = db.parse(new InputSource(new StringReader(
1018: message.toString())));
1019: Element messageBody = msgDoc.getDocumentElement();
1020:
1021: InteractionPAssertion ipa = new InteractionPAssertion(
1022: localPAssertionId, documentationStyle, messageBody);
1023:
1024: return ipa;
1025: }
1026:
1027: private InteractionPAssertion createTransferCompletionInteractionPAssertion()
1028: throws Exception {
1029: // Create an interaction p-assertion
1030: // First we make a local p-assertion id and then
1031: // we make a documentationStyle. In this case we
1032: // call it verbatium.
1033: //
1034: // In most cases, you'll be grabing the messageBody from the message
1035: // being sent between parties. So a SOAP message, or a CORBA message.
1036: // With this example we'll just use a hard coded message body.
1037:
1038: String localPAssertionId = "1";
1039:
1040: // this message content will be obtained by parsing the transfer input files <jobid.in> and obtaining the destination urls
1041:
1042: BufferedReader bf = new BufferedReader(new FileReader(new File(
1043: jobname + ".in")));
1044: String line = null;
1045: StringBuffer message = new StringBuffer(
1046: "<transfer xmlns=\"http://pegasus.isi.edu/schema/pasoa/content/transfer\">");
1047: while ((line = bf.readLine()) != null) {
1048: bf.readLine();
1049: bf.readLine();
1050: line = bf.readLine();
1051: filecount++;
1052: if (jobname.startsWith("new_rc_tx")) {
1053: message.append("<filename>" + line + "</filename>");
1054: } else {
1055: String lfn = line.split("run\\d{4}/")[1];
1056: message.append("<filename file=\"" + lfn + "\">" + line
1057: + "</filename>");
1058: }
1059: }
1060: bf.close();
1061: message.append("</transfer>");
1062:
1063: // Convert it into a DOM Element
1064: DocumentBuilderFactory dbf = DocumentBuilderFactory
1065: .newInstance();
1066: DocumentBuilder db = dbf.newDocumentBuilder();
1067: Document msgDoc = db.parse(new InputSource(new StringReader(
1068: message.toString())));
1069: Element messageBody = msgDoc.getDocumentElement();
1070:
1071: InteractionPAssertion ipa = new InteractionPAssertion(
1072: localPAssertionId, documentationStyle, messageBody);
1073:
1074: return ipa;
1075: }
1076:
1077: private ActorStatePAssertion createActorStatePAssertion(long count)
1078: throws Exception {
1079: // Create an actor state p-assertion
1080: // Just like the interaction p-assertion this p-assertion
1081: // needs a local p-assertion id. Remember, all the p-assertions
1082: // in one view need a different id. Therefore, we give this assertion
1083: // the id of "2" instead of "1".
1084: //
1085: // Again you'll typically be getting some state from the actor,
1086: // translating it to XML to create the actor state p-assertion
1087: // In this example, we just use a hard coded string.
1088:
1089: String localPAssertionId = "aspa-" + count;
1090:
1091: ActorStatePAssertion asa = new ActorStatePAssertion(
1092: localPAssertionId, docelement);
1093:
1094: return asa;
1095: }
1096:
1097: private RelationshipPAssertion createTransferRelationshipPAssertion(
1098: InteractionKey invocationik, long index) throws Exception {
1099: // Create a relationship p-assertion
1100: // Again a different local p-assertion id
1101: //
1102: // We'll create a "usage" relationship between the interaction p-assertion
1103: // and the actor state p-assertion. This relationship says that
1104: // message represented by interaction p-assertion "1" used the actor state
1105: // represented by actor state p-assertion "2".
1106: // There are no data accessors or links so we pass null.
1107:
1108: // Create the information to identify the subject of the relationship
1109: // Remember, parameter names must be identified and they need to be URIs
1110: String localPAssertionId = "rpa" + index;
1111: String subjectLocalPAssertionId = "1";
1112: String subjectParameterName = "http://pegasus.isi.edu/schema/pasoa/type/outputfile";
1113:
1114: // Create the information to identify the object of the relationship
1115:
1116: String objectLocalPAssertionId = "1"; // points to the interaction p-assertion of the invocation interaction receiver
1117:
1118: GlobalPAssertionKey gpak = new GlobalPAssertionKey(
1119: invocationik, "receiver", objectLocalPAssertionId);
1120: String objectParameterName = "http://pegasus.isi.edu/schema/pasoa/type/inputfile";
1121:
1122: Element dataAccessor = createTransferDataAccessor(index);
1123:
1124: ObjectID objId = new ObjectID(gpak, objectParameterName,
1125: dataAccessor, null);
1126:
1127: // We add the objId to the list of objects. We only have one objectId here
1128: // but when making another type of relationship more than one objectId may
1129: // be required
1130: LinkedList objectIds = new LinkedList();
1131: objectIds.add(objId);
1132:
1133: // Create the "use" relation. Again this should be a URI
1134: String relation = "http://pegasus.isi.edu/pasoa/relation/transfer/copy-of";
1135: dataAccessor = createTransferDataAccessor(index);
1136: // Finally, create the relationship object and return it.
1137: RelationshipPAssertion rel = new RelationshipPAssertion(
1138: localPAssertionId, subjectLocalPAssertionId,
1139: dataAccessor, subjectParameterName, relation, objectIds);
1140:
1141: return rel;
1142:
1143: }
1144:
1145: //will have to do for handling merged jobs correctly.
1146: private InteractionPAssertion createMergedJobInvocationInteractionPAssertion()
1147: throws Exception {
1148:
1149: String localPAssertionId = "1";
1150:
1151: // this message content will be obtained by parsing the transfer input files <jobid.in> and obtaining the source urls
1152: StringBuffer message = new StringBuffer(
1153: "<files link=\"input\" xmlns=\"http://pegasus.isi.edu/schema/pasoa/content/files\">");
1154: if (input != null) {
1155: if (input.containsKey(jobname)) {
1156: List inputs = (List) input.get(jobname);
1157: for (Iterator i = inputs.iterator(); i.hasNext();) {
1158: message.append("<filename>" + (String) i.next()
1159: + "</filename>");
1160: }
1161: }
1162: }
1163: message.append("</files>");
1164:
1165: // Convert it into a DOM Element
1166: DocumentBuilderFactory dbf = DocumentBuilderFactory
1167: .newInstance();
1168: DocumentBuilder db = dbf.newDocumentBuilder();
1169: Document msgDoc = db.parse(new InputSource(new StringReader(
1170: message.toString())));
1171: Element messageBody = msgDoc.getDocumentElement();
1172:
1173: InteractionPAssertion ipa = new InteractionPAssertion(
1174: localPAssertionId, documentationStyle, messageBody);
1175:
1176: return ipa;
1177:
1178: }
1179:
1180: private InteractionPAssertion createJobInvocationInteractionPAssertion()
1181: throws Exception {
1182:
1183: String localPAssertionId = "1";
1184:
1185: // this message content will be obtained by parsing the transfer input files <jobid.in> and obtaining the source urls
1186:
1187: StringBuffer message = new StringBuffer(
1188: "<files link=\"input\" xmlns=\"http://pegasus.isi.edu/schema/pasoa/content/files\">");
1189: if (input != null) {
1190: if (input.containsKey(jobname)) {
1191: List inputs = (List) input.get(jobname);
1192: for (Iterator i = inputs.iterator(); i.hasNext();) {
1193: message.append("<filename>" + (String) i.next()
1194: + "</filename>");
1195: }
1196: }
1197: }
1198: message.append("</files>");
1199:
1200: // Convert it into a DOM Element
1201: DocumentBuilderFactory dbf = DocumentBuilderFactory
1202: .newInstance();
1203: DocumentBuilder db = dbf.newDocumentBuilder();
1204: Document msgDoc = db.parse(new InputSource(new StringReader(
1205: message.toString())));
1206: Element messageBody = msgDoc.getDocumentElement();
1207:
1208: InteractionPAssertion ipa = new InteractionPAssertion(
1209: localPAssertionId, documentationStyle, messageBody);
1210:
1211: return ipa;
1212:
1213: }
1214:
1215: private InteractionPAssertion createJobCompletionInteractionPAssertion()
1216: throws Exception {
1217:
1218: String localPAssertionId = "1";
1219:
1220: // this message content will be obtained by parsing the transfer input files <jobid.in> and obtaining the source urls
1221:
1222: StringBuffer message = new StringBuffer(
1223: "<files link=\"output\" xmlns=\"http://pegasus.isi.edu/schema/pasoa/content/files\">");
1224: if (output != null) {
1225: if (output.containsKey(jobname)) {
1226: List outputs = (List) output.get(jobname);
1227: for (Iterator i = outputs.iterator(); i.hasNext();) {
1228: message.append("<filename>" + (String) i.next()
1229: + "</filename>");
1230: }
1231: }
1232: }
1233: message.append("</files>");
1234:
1235: // Convert it into a DOM Element
1236: DocumentBuilderFactory dbf = DocumentBuilderFactory
1237: .newInstance();
1238: DocumentBuilder db = dbf.newDocumentBuilder();
1239: Document msgDoc = db.parse(new InputSource(new StringReader(
1240: message.toString())));
1241: Element messageBody = msgDoc.getDocumentElement();
1242:
1243: InteractionPAssertion ipa = new InteractionPAssertion(
1244: localPAssertionId, documentationStyle, messageBody);
1245:
1246: return ipa;
1247: }
1248:
1249: private RelationshipPAssertion createJobRelationshipPAssertion(
1250: InteractionKey invocationik, String filename, long index)
1251: throws Exception {
1252: String localPAssertionId = "rpa" + index;
1253: String subjectLocalPAssertionId = "1";
1254: String subjectParameterName = "http://pegasus.isi.edu/schema/pasoa/type/outputfile";
1255:
1256: // Create the information to identify the object of the relationship
1257:
1258: String objectLocalPAssertionId = "1"; // points to the interaction p-assertion of the invocation interaction receiver
1259:
1260: GlobalPAssertionKey gpak = new GlobalPAssertionKey(
1261: invocationik, "receiver", objectLocalPAssertionId);
1262: String objectParameterName = "http://pegasus.isi.edu/schema/pasoa/type/inputfile";
1263: LinkedList objectIds = new LinkedList();
1264:
1265: for (Iterator i = ((List) input.get(jobname)).iterator(); i
1266: .hasNext();) {
1267: Element dataAccessor = createLFNDataAccessor((String) i
1268: .next());
1269: // We add the objId to the list of objects. We only have one objectId here
1270: // but when making another type of relationship more than one objectId may
1271: // be required
1272: objectIds.add(new ObjectID(gpak, objectParameterName,
1273: dataAccessor, null));
1274: }
1275:
1276: // Create the "use" relation. Again this should be a URI
1277: String relation = "http://pegasus.isi.edu/pasoa/relation/transformation/product-of";
1278: Element dataAccessor = createLFNDataAccessor(filename);
1279:
1280: // Finally, create the relationship object and return it.
1281: RelationshipPAssertion rel = new RelationshipPAssertion(
1282: localPAssertionId, subjectLocalPAssertionId,
1283: dataAccessor, subjectParameterName, relation, objectIds);
1284:
1285: return rel;
1286:
1287: }
1288:
1289: private RelationshipPAssertion createJobToTransferRelationshipPAssertion(
1290: String filename, String parentjob, int index)
1291: throws Exception {
1292: String localPAssertionId = "rpa" + index;
1293: String subjectLocalPAssertionId = "1";
1294: String subjectParameterName = "http://pegasus.isi.edu/schema/pasoa/type/inputfile";
1295:
1296: // Create the information to identify the object of the relationship
1297:
1298: String objectLocalPAssertionId = "1"; // points to the interaction p-assertion of the invocation interaction receiver
1299: // interaction.
1300:
1301: WSAddressEndpoint source = new WSAddressEndpoint(parentjob);
1302: WSAddressEndpoint sink = new WSAddressEndpoint(CONDOR);
1303:
1304: String interactionId = wf_label + wf_planned_time + parentjob;
1305: InteractionKey ik = new InteractionKey(source.getElement(),
1306: sink.getElement(), interactionId);
1307:
1308: GlobalPAssertionKey gpak = new GlobalPAssertionKey(ik,
1309: "receiver", objectLocalPAssertionId);
1310: String objectParameterName = "http://pegasus.isi.edu/schema/pasoa/type/outputfile";
1311:
1312: Element dataAccessor = createLFNDataAccessor(filename);
1313:
1314: ObjectID objId = new ObjectID(gpak, objectParameterName,
1315: dataAccessor, null);
1316:
1317: // We add the objId to the list of objects. We only have one objectId here
1318: // but when making another type of relationship more than one objectId may
1319: // be required
1320: LinkedList objectIds = new LinkedList();
1321: objectIds.add(objId);
1322:
1323: // Create the "use" relation. Again this should be a URI
1324: String relation = "http://pegasus.isi.edu/pasoa/relation/transfer/same-as";
1325: // dataAccessor=createNameValueDataAccessor(filename);
1326: // Finally, create the relationship object and return it.
1327: RelationshipPAssertion rel = new RelationshipPAssertion(
1328: localPAssertionId, subjectLocalPAssertionId,
1329: dataAccessor, subjectParameterName, relation, objectIds);
1330:
1331: return rel;
1332: }
1333:
1334: private Element createTransferDataAccessor(long index) {
1335: Map namespaces = new HashMap();
1336: namespaces.put("tr",
1337: "http://pegasus.isi.edu/schema/pasoa/content/transfer");
1338: return new org.pasoa.accessors.snxpath.SingleNodeXPathManager()
1339: .createAccessor("/tr:transfer[0]/tr:filename[" + index
1340: + "]", namespaces);
1341:
1342: }
1343:
1344: private Element createLFNDataAccessor(String value) {
1345: return new org.pasoa.accessors.lfn.LFNAccessorManager()
1346: .createLFNAccessor(value);
1347: }
1348:
1349: private InteractionPAssertion createRegisterInvocationInteractionPAssertion()
1350: throws Exception {
1351: // Create an interaction p-assertion
1352: // First we make a local p-assertion id and then
1353: // we make a documentationStyle. In this case we
1354: // call it verbatium.
1355: //
1356: // In most cases, you'll be grabing the messageBody from the message
1357: // being sent between parties. So a SOAP message, or a CORBA message.
1358: // With this example we'll just use a hard coded message body.
1359:
1360: String localPAssertionId = "1";
1361:
1362: BufferedReader bf = new BufferedReader(new FileReader(new File(
1363: jobname + ".in")));
1364: String line = null;
1365: StringBuffer message = new StringBuffer(
1366: "<register xmlns=\"http://pegasus.isi.edu/schema/pasoa/content/register\">");
1367: while ((line = bf.readLine()) != null) {
1368: filecount++;
1369: String[] lfn = line.split(" ");
1370: message.append("<filename file=\"" + lfn[0] + "\">"
1371: + lfn[1] + "</filename>");
1372:
1373: }
1374: message.append("</register>");
1375:
1376: // Convert it into a DOM Element
1377: DocumentBuilderFactory dbf = DocumentBuilderFactory
1378: .newInstance();
1379: DocumentBuilder db = dbf.newDocumentBuilder();
1380: Document msgDoc = db.parse(new InputSource(new StringReader(
1381: message.toString())));
1382: Element messageBody = msgDoc.getDocumentElement();
1383:
1384: InteractionPAssertion ipa = new InteractionPAssertion(
1385: localPAssertionId, documentationStyle, messageBody);
1386:
1387: return ipa;
1388: }
1389:
1390: public InteractionKey registerInvocationInteraction()
1391: throws Exception {
1392:
1393: System.out
1394: .println("We now create the transfer Invocation interaction key");
1395:
1396: // Create addresses for the source and sink of the
1397: // interaction.
1398: WSAddressEndpoint source = new WSAddressEndpoint(CONDOR);
1399: WSAddressEndpoint sink = new WSAddressEndpoint(jobname);
1400:
1401: String interactionId = wf_label + wf_planned_time + jobname;
1402: InteractionKey ik = new InteractionKey(source.getElement(),
1403: sink.getElement(), interactionId);
1404:
1405: System.out.println("Building p-assertions...");
1406:
1407: InteractionPAssertion ipa = createRegisterInvocationInteractionPAssertion();
1408:
1409: List records = new ArrayList();
1410: //setting sender type
1411: System.out
1412: .println("We are the sender/client view of the interaction");
1413: String vk = Constants.SENDER_VIEW_TYPE;
1414: System.out.println();
1415:
1416: //set asserter to CONDOR
1417:
1418: WSAddressEndpoint asserter = new WSAddressEndpoint(CONDOR);
1419:
1420: System.out
1421: .println("Creating Record objects for each p-assertion");
1422:
1423: Record recIpa = new Record(ipa, ik, vk, asserter.getElement());
1424: records.add(recIpa);
1425: Record recRpa = null;
1426:
1427: String tempparent = null;
1428: if (parents != null || !parents.isEmpty()) {
1429: tempparent = (String) parents.get(0);
1430: }
1431: for (int i = 0; i < filecount; i++) {
1432: recRpa = new Record(
1433: createRegisterToTransferRelationshipPAssertion(
1434: tempparent, i), ik, vk, asserter
1435: .getElement());
1436: records.add(recRpa);
1437: }
1438: System.out
1439: .println("Recording the p-assertions in provenance store "
1440: + provenanceStore);
1441:
1442: clientLib.record(records.iterator(), provenanceStore, true);
1443:
1444: System.out.println("sender p-assertions recorded");
1445: System.out.println();
1446:
1447: //setting reciever type
1448:
1449: System.out
1450: .println("We are the sender/client view of the interaction");
1451: vk = Constants.RECEIVER_VIEW_TYPE;
1452: System.out.println();
1453:
1454: //set asserter to CONDOR
1455:
1456: asserter = new WSAddressEndpoint(jobname);
1457: recIpa = new Record(ipa, ik, vk, asserter.getElement());
1458:
1459: System.out
1460: .println("Recording the p-assertions in provenance store "
1461: + provenanceStore);
1462:
1463: clientLib.record(recIpa, provenanceStore);
1464:
1465: System.out.println("receiver p-assertions recorded");
1466: System.out.println();
1467:
1468: return ik;
1469:
1470: }
1471:
1472: public void registerCompletionInteraction(
1473: InteractionKey invocationinteractionkey) throws Exception {
1474:
1475: System.out
1476: .println("We now create the register Completion interaction key");
1477:
1478: // Create addresses for the source and sink of the
1479: // interaction.
1480: WSAddressEndpoint source = new WSAddressEndpoint(jobname);
1481: WSAddressEndpoint sink = new WSAddressEndpoint(CONDOR);
1482:
1483: String interactionId = wf_label + wf_planned_time + jobname;
1484: InteractionKey ik = new InteractionKey(source.getElement(),
1485: sink.getElement(), interactionId);
1486:
1487: System.out.println("Building p-assertions...");
1488: List records = new ArrayList();
1489: //setting sender type
1490: System.out
1491: .println("We are the sender/client view of the interaction");
1492: String vk = Constants.SENDER_VIEW_TYPE;
1493: System.out.println();
1494:
1495: WSAddressEndpoint asserter = new WSAddressEndpoint(jobname);
1496:
1497: System.out
1498: .println("Creating Record objects for each p-assertion ....... ");
1499:
1500: ActorStatePAssertion apa = createActorStatePAssertion(0);
1501: System.out.println("Done");
1502: Record recApa = new Record(apa, ik, vk, asserter.getElement());
1503:
1504: records.add(recApa);
1505: System.out.print("Recording sender p-assertions ............ ");
1506:
1507: clientLib.record(records.iterator(), provenanceStore, true);
1508:
1509: System.out.println("DONE\n");
1510:
1511: //setting reciever type
1512:
1513: System.out
1514: .println("We are the sender/client view of the interaction\n");
1515: vk = Constants.RECEIVER_VIEW_TYPE;
1516: asserter = new WSAddressEndpoint(CONDOR);
1517:
1518: //no receiver InteractionPAssertion.
1519:
1520: }
1521:
1522: private RelationshipPAssertion createRegisterToTransferRelationshipPAssertion(
1523: String parentjob, long index) throws Exception {
1524: // Create a relationship p-assertion
1525: // Again a different local p-assertion id
1526: //
1527: // We'll create a "usage" relationship between the interaction p-assertion
1528: // and the actor state p-assertion. This relationship says that
1529: // message represented by interaction p-assertion "1" used the actor state
1530: // represented by actor state p-assertion "2".
1531: // There are no data accessors or links so we pass null.
1532:
1533: // Create the information to identify the subject of the relationship
1534: // Remember, parameter names must be identified and they need to be URIs
1535: String localPAssertionId = "rpa" + index;
1536: String subjectLocalPAssertionId = "1";
1537: String subjectParameterName = "http://pegasus.isi.edu/schema/pasoa/type/outputfile";
1538:
1539: // Create the information to identify the object of the relationship
1540:
1541: String objectLocalPAssertionId = "1"; // points to the interaction p-assertion of the invocation interaction receiver
1542:
1543: WSAddressEndpoint source = new WSAddressEndpoint(parentjob);
1544: WSAddressEndpoint sink = new WSAddressEndpoint(CONDOR);
1545:
1546: String interactionId = wf_label + wf_planned_time + parentjob;
1547: InteractionKey ik = new InteractionKey(source.getElement(),
1548: sink.getElement(), interactionId);
1549:
1550: GlobalPAssertionKey gpak = new GlobalPAssertionKey(ik,
1551: "receiver", objectLocalPAssertionId);
1552: String objectParameterName = "http://pegasus.isi.edu/schema/pasoa/type/inputfile";
1553:
1554: Element dataAccessor = createTransferDataAccessor(index);
1555:
1556: ObjectID objId = new ObjectID(gpak, objectParameterName,
1557: dataAccessor, null);
1558:
1559: // We add the objId to the list of objects. We only have one objectId here
1560: // but when making another type of relationship more than one objectId may
1561: // be required
1562: LinkedList objectIds = new LinkedList();
1563: objectIds.add(objId);
1564:
1565: // Create the "use" relation. Again this should be a URI
1566: String relation = "http://pegasus.isi.edu/pasoa/relation/register/rls-mapping";
1567: // Finally, create the relationship object and return it.
1568: RelationshipPAssertion rel = new RelationshipPAssertion(
1569: localPAssertionId, subjectLocalPAssertionId,
1570: dataAccessor, subjectParameterName, relation, objectIds);
1571:
1572: return rel;
1573:
1574: }
1575: }
|