001: package org.griphyn.cPlanner.provenance.pasoa.pps;
002:
003: import org.griphyn.cPlanner.provenance.pasoa.PPS;
004:
005: import org.griphyn.cPlanner.classes.SubInfo;
006: import org.griphyn.cPlanner.engine.Refiner;
007:
008: import org.pasoa.common.BestPractice;
009: import org.pasoa.common.Constants;
010:
011: import org.pasoa.pstructure.GlobalPAssertionKey;
012: import org.pasoa.pstructure.InteractionKey;
013: import org.pasoa.pstructure.InteractionPAssertion;
014: import org.pasoa.pstructure.ObjectID;
015: import org.pasoa.pstructure.PAssertion;
016: import org.pasoa.pstructure.Record;
017: import org.pasoa.pstructure.RelationshipPAssertion;
018: import org.pasoa.pstructure.SubjectID;
019:
020: import org.pasoa.storeclient.ClientLib;
021:
022: import org.pasoa.util.httpsoap.WSAddressEndpoint;
023:
024: import java.io.IOException;
025: import java.io.StringReader;
026: import java.net.URL;
027: import java.util.Iterator;
028: import java.util.LinkedList;
029: import java.util.List;
030: import javax.xml.parsers.DocumentBuilder;
031: import javax.xml.parsers.DocumentBuilderFactory;
032: import org.w3c.dom.Element;
033: import org.xml.sax.InputSource;
034: import org.xml.sax.SAXException;
035:
036: /**
037: * Implements the PPS interface for recording documentation from a Pegasus refinement.
038: */
039: public class Pasoa implements PPS {
040: // The current workflow XML serialisation (except for the final footer part: see _xmlFooter below)
041: // This is built up cumulatively over time by the refiners providing XML fragments to add
042: private String _workflowXML;
043: // A count of the number of relationship p-assertions recorded (used to create unique p-assertion IDs)
044: private int _relationshipPAssertionCounter;
045: // The key for the interaction in which a refiner is invoked
046: private InteractionKey _causeKey;
047: // The key for the interaction in which a refiner completes
048: private InteractionKey _effectKey;
049: // The name (URI) of the current refinement step
050: private String _refinement;
051: // The unique name of the current refinement process, generated from system time
052: private String _refinementID;
053:
054: // The suffix to the XML workflow serialisation
055: //private static final String _xmlFooter = "</workflow>";
056: private static final String _xmlFooter = "";
057:
058: /**
059: * On initialisation, create a ClientLib object for communication with a
060: * store, set the store URL and create a namespace-aware DOM document parser.
061: */
062: public Pasoa() throws Exception {
063: _storeProxy = new ClientLib();
064: String storeURL = "http://localhost:8080/preserv-1.0";
065:
066: _storeRecordURL = new URL(storeURL + "/record");
067:
068: DocumentBuilderFactory factory = DocumentBuilderFactory
069: .newInstance();
070: factory.setNamespaceAware(true);
071: _builder = factory.newDocumentBuilder();
072: }
073:
074: /**
075: * On initialisation, create a ClientLib object for communication with a
076: * store, set the store URL and create a namespace-aware DOM document parser.
077: */
078: public Pasoa(String storeURL) throws Exception {
079: _storeProxy = new ClientLib();
080: _storeRecordURL = new URL(storeURL + "/record");
081:
082: DocumentBuilderFactory factory = DocumentBuilderFactory
083: .newInstance();
084: factory.setNamespaceAware(true);
085: _builder = factory.newDocumentBuilder();
086: }
087:
088: // PPS methods //
089:
090: public String beginWorkflowRefinementStep(Refiner workflow,
091: String refinementStepName, boolean firstStep)
092: throws Exception {
093: if (firstStep) {
094: _workflowXML = workflow.getXMLProducer().toXML();
095: _refinementID = Long.toString(System.currentTimeMillis());
096: _causeKey = createInteractionKey(_refinementID,
097: refinementStepName, true);
098: } else {
099: // Record relationships between output of one refinement, input of the one being started
100: _causeKey = _effectKey;
101: _effectKey = createInteractionKey(_refinementID,
102: refinementStepName, true);
103: for (Iterator it = workflow.getWorkflow().jobIterator(); it
104: .hasNext();) {
105: SubInfo job = (SubInfo) it.next();
106: String name = job.getName();
107: isIdenticalTo(name, name);
108: }
109: // Now move on to refinement itself
110: _causeKey = _effectKey;
111: }
112: _effectKey = createInteractionKey(_refinementID,
113: refinementStepName, false);
114: _refinement = refinementStepName;
115: _relationshipPAssertionCounter = 0;
116:
117: // Record the initial invocation of the refiner
118: recordInteraction(_workflowXML + _xmlFooter, _causeKey,
119: _refinement, true);
120:
121: return _refinementID;
122: }
123:
124: public void isIdenticalTo(String afterNode, String beforeNode)
125: throws Exception {
126: recordRelationship(_relationshipPAssertionCounter, afterNode,
127: _identicalParameter, _identicalRelation, _causeKey,
128: beforeNode, _identicalParameter, _refinement);
129: _relationshipPAssertionCounter += 1;
130: }
131:
132: public void siteSelectionFor(String afterNode, String beforeNode)
133: throws Exception {
134: recordRelationship(_relationshipPAssertionCounter, afterNode,
135: _siteSelectionOutputParameter, _siteSelectionRelation,
136: _causeKey, beforeNode, _siteSelectionInputParameter,
137: _refinement);
138: _relationshipPAssertionCounter += 1;
139: }
140:
141: public void stagingIntroducedFor(List stagingNodes, String appNode)
142: throws Exception {
143: for (Object stagingNode : stagingNodes) {
144: recordRelationship(_relationshipPAssertionCounter,
145: stagingNode.toString(), _stagingParameter,
146: _stagingRelation, _causeKey, appNode,
147: _stagedForParameter, _refinement);
148: _relationshipPAssertionCounter += 1;
149: }
150: }
151:
152: public void registrationIntroducedFor(String registrationNode,
153: String dataStagingNode) throws Exception {
154: recordRelationship(_relationshipPAssertionCounter,
155: dataStagingNode, _registrationParameter,
156: _registrationRelation, _causeKey, registrationNode,
157: _registrationOfParameter, _refinement);
158: _relationshipPAssertionCounter += 1;
159: }
160:
161: public void clusteringOf(String clusteredJob, List jobs)
162: throws Exception {
163: for (Object inCluster : jobs) {
164: recordRelationship(_relationshipPAssertionCounter,
165: clusteredJob, _clusterParameter,
166: _clusteredRelation, _causeKey,
167: inCluster.toString(), _inClusterParameter,
168: _refinement);
169: _relationshipPAssertionCounter += 1;
170: }
171: }
172:
173: public void isPartitionOf(String afterNode, List beforeNode) {
174: throw new UnsupportedOperationException();
175: }
176:
177: public void endWorkflowRefinementStep(Refiner workflow)
178: throws Exception {
179: _workflowXML += workflow.getXMLProducer().toXML();
180: recordInteraction(_workflowXML + _xmlFooter, _effectKey,
181: _refinement, false);
182: }
183:
184: // Utility constants and methods //
185:
186: /**
187: * A namespace we can use to identify relationships and concepts defined for Pegasus' provenance data
188: */
189: //private static final String _namespace = "http://www.isi.edu/pasoa";
190: // Relations:
191: // Relationships are asserted between workflow nodes before a refinement and
192: // those after the refinement. The former are 'objects' of the relationship,
193: // the latter are 'subjects'. Every relationship has a type which is identified
194: // by a URI.
195: //
196: // For each subject and object of a relationship, the role that each plays
197: // in the relationship must be declared, the role type being called the
198: // 'parameter name' and identified by a URI.
199: /**
200: * The identicalTo relationship relates a workflow node before and after a
201: * refinement that has not changed during that refinement
202: */
203: public static final String _identicalRelation = NAMESPACE
204: + "/relations#identicalTo";
205: /*
206: * In an identical relationship both subject and object play the role of
207: * 'item', as in 'this item is identical to that item'.
208: */
209: public static final String _identicalParameter = NAMESPACE
210: + "/parameters#item";
211:
212: /**
213: * The site seleciotn relationship relates a job that has had its site selected
214: * to that same job before site selection.
215: */
216: public static final String _siteSelectionRelation = NAMESPACE
217: + "/relations#siteSelectionOf";
218: /**
219: * The job before site selection plays the 'preselection' role.
220: */
221: public static final String _siteSelectionInputParameter = NAMESPACE
222: + "/parameters#preselection";
223: /**
224: * The job after site selection plays the 'postselection' role.
225: */
226: public static final String _siteSelectionOutputParameter = NAMESPACE
227: + "/parameters#postselection";
228:
229: public static final String _stagingRelation = NAMESPACE
230: + "/relations#staging";
231: public static final String _stagedForParameter = NAMESPACE
232: + "/parameters#stagedFor";
233: public static final String _stagingParameter = NAMESPACE
234: + "/parameters#staging";
235:
236: public static final String _registrationRelation = NAMESPACE
237: + "/relations#registration";
238: public static final String _registrationOfParameter = NAMESPACE
239: + "/parameters#registrationOf";
240: public static final String _registrationParameter = NAMESPACE
241: + "/parameters#registration";
242:
243: public static final String _clusteredRelation = NAMESPACE
244: + "/relations#clustered";
245: public static final String _inClusterParameter = NAMESPACE
246: + "/parameters#inCluster";
247: public static final String _clusterParameter = NAMESPACE
248: + "/parameters#cluster";
249:
250: /**
251: * A partially refined workflow is specified as an XML document.
252: * We represent this as a String object, and for convenience this is
253: * the closing tag of that document.
254: */
255: private static final String _workflowPostfix = "</workflow>";
256:
257: /** ClientLib is the primary class by which a client communicates with a provenance store */
258: private ClientLib _storeProxy;
259: /** The URL of the provenance store Web Service (recording port) */
260: private URL _storeRecordURL;
261: /** A pre-created DOM XML parser (expensive to create so we do just once) */
262: private DocumentBuilder _builder;
263:
264: /**
265: * Conventionally, we use WS-Addressing to identify the endpoints of an
266: * interaction between actors, and this method constructs an XML (DOM) fragment
267: * in the WS-Addressing schema for a particular URL.
268: *
269: * @param address The URL of the endpoint
270: * @return An XML (DOM) fragment in WS-Addressing endpoint schema containing the address
271: */
272: public static Element addressToElement(String address) {
273: return new WSAddressEndpoint(address).getElement();
274: }
275:
276: /**
277: * Individual jobs in a workflow are identified by an XML document fragment,
278: * called a data accessor, and this method constructs the fragment for a given
279: * job ID.
280: *
281: * @param causeJobID The job ID
282: * @return An XML (DOM) fragment representing a reference to that job in an XML workflow representation
283: */
284: public Element createDataAccessor(String jobID) throws IOException,
285: SAXException {
286: return toElement("<jobID xmlns = \"" + NAMESPACE + "\">"
287: + jobID + "</jobID>");
288: }
289:
290: /**
291: * Creates an interaction p-assertion asserting that a given partially
292: * refined workflow was exchanged between actors.
293: *
294: * @param workflow The (XML) content of the partially refined workflow
295: * @return A JavaBean representation of an interaction p-assertion containing the workflow
296: */
297: public InteractionPAssertion createInteractionPAssertion(
298: String workflow) throws IOException, SAXException {
299: return new InteractionPAssertion("1",
300: BestPractice.VERBATIM_STYLE, toElement(workflow
301: + _workflowPostfix));
302: }
303:
304: /**
305: * Creates an interaction key to identify an interaction between two actors.
306: *
307: * @refinementID The unique identifier for this workflow refinement (run of Pegasus)
308: * @refinementAddress The URI of the particular refinement step (site selection, cluster etc.)
309: * @preRefinement True if the interaction is pre-refinement, i.e. from Pegasus to a refiner, rather than the other way round
310: */
311: public InteractionKey createInteractionKey(String refinementID,
312: String refinementAddress, boolean preRefinement) {
313: if (preRefinement) {
314: return new InteractionKey(addressToElement(PEGASUS),
315: addressToElement(refinementAddress), refinementID
316: + "Start");
317: } else {
318: return new InteractionKey(
319: addressToElement(refinementAddress),
320: addressToElement(PEGASUS), refinementID + "End");
321: }
322: }
323:
324: /**
325: * Creates a relationship p-assertion between nodes in two partially refined workflows.
326: *
327: * @param count The index of this relationship p-assertion in the interaction (to support the requirement that each p-assertion has a unique ID)
328: * @param effectJobID The job ID of the subject (effect) of the relationship
329: * @param effectParameter The role played by the subject of the relationship
330: * @param relationType The type of the relationship
331: * @param causeKey The interaction key of the object of the relationship
332: * @param causeJobID The job ID of the object (cause) of the relationship
333: * @param causeParameter The role played by the object of the relationship
334: * @return A RelationshipPAssertion JavaBean representing the relationship p-assertion with the given arguments
335: */
336: public RelationshipPAssertion createRelationship(int count,
337: String effectJobID, String effectParameter,
338: String relationType, InteractionKey causeKey,
339: String causeJobID, String causeParameter)
340: throws IOException, SAXException {
341: List<ObjectID> objectIDs = new LinkedList<ObjectID>();
342: ObjectID objectID = new ObjectID(new GlobalPAssertionKey(
343: causeKey, Constants.RECEIVER_VIEW_TYPE, "1"),
344: effectParameter, createDataAccessor(causeJobID), null);
345:
346: objectIDs.add(objectID);
347:
348: return new RelationshipPAssertion("RPA" + count, new SubjectID(
349: "1", createDataAccessor(effectJobID), effectParameter),
350: relationType, objectIDs);
351: }
352:
353: public void record(PAssertion passertion,
354: InteractionKey interactionKey, boolean isSender,
355: String asserterURL) throws Exception {
356: if (isSender) {
357: _storeProxy.record(new Record(passertion, interactionKey,
358: Constants.SENDER_VIEW_TYPE,
359: addressToElement(asserterURL)), _storeRecordURL);
360: } else {
361: _storeProxy.record(new Record(passertion, interactionKey,
362: Constants.RECEIVER_VIEW_TYPE,
363: addressToElement(asserterURL)), _storeRecordURL);
364: }
365: }
366:
367: public void recordInteraction(InteractionPAssertion passertion,
368: InteractionKey interactionKey, String refinerType,
369: boolean refinementInput) throws Exception {
370: if (refinementInput) {
371: record(passertion, interactionKey, true, PEGASUS);
372: record(passertion, interactionKey, false, refinerType);
373: } else {
374: record(passertion, interactionKey, true, refinerType);
375: record(passertion, interactionKey, false, PEGASUS);
376: }
377: }
378:
379: public void recordInteraction(String workflow,
380: InteractionKey interactionKey, String refinerType,
381: boolean refinementInput) throws Exception {
382: recordInteraction(createInteractionPAssertion(workflow),
383: interactionKey, refinerType, refinementInput);
384: }
385:
386: public RelationshipPAssertion recordRelationship(int count,
387: String effectJobID, String effectParameter,
388: String relationType, InteractionKey causeKey,
389: String causeJobID, String causeParameter, String asserterURL)
390: throws Exception {
391: RelationshipPAssertion passertion = createRelationship(count,
392: effectJobID, effectParameter, relationType, causeKey,
393: causeJobID, causeParameter);
394: record(passertion, _effectKey, true, asserterURL);
395: return passertion;
396: }
397:
398: /**
399: * Convenience method to parse string represented XML into a DOM XML fragment representation
400: */
401: public Element toElement(String xmlAsString) throws IOException,
402: SAXException {
403: //System.out.println( "XML as string is " + xmlAsString );
404: return _builder.parse(
405: new InputSource(new StringReader(xmlAsString)))
406: .getDocumentElement();
407: }
408: }
|