001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.visualize.nodeusage;
017:
018: import org.griphyn.cPlanner.common.LogManager;
019:
020: import org.griphyn.common.util.Currently;
021:
022: import org.griphyn.cPlanner.visualize.Callback;
023: import org.griphyn.cPlanner.visualize.JobMeasurements;
024: import org.griphyn.cPlanner.visualize.WorkflowMeasurements;
025:
026: import org.griphyn.vdl.invocation.StatInfo;
027:
028: import java.util.List;
029: import java.util.Iterator;
030: import java.util.StringTokenizer;
031: import java.util.Map;
032: import java.util.HashMap;
033: import java.util.Date;
034:
035: import java.io.File;
036: import java.io.FileReader;
037: import java.io.BufferedReader;
038: import java.io.IOException;
039:
040: /**
041: * Implements callback interface to calculate node usage or number of jobs
042: * over time.
043: *
044: * @author Karan Vahi
045: * @version $Revision: 50 $
046: */
047:
048: public class NodeUsageCallback implements Callback {
049:
050: /**
051: * The name of the tailstatd file.
052: */
053: public static final String JOBSTATE_LOG = "jobstate.log";
054:
055: /**
056: * The state in the jobstate that is taken to designate the GRIDSTART_PREJOB
057: * time.
058: */
059: public static final String GRIDSTART_PREJOB_STATE = "EXECUTE";
060:
061: /**
062: * The state in the jobstate that is taken to designate the GRIDSTART_MAINJOB
063: * time.
064: */
065: public static final String GRIDSTART_MAINJOB_STATE = "EXECUTE";
066:
067: /**
068: * The state in the jobstate that is taken to designate the GRIDSTART_POSTJOB
069: * time.
070: */
071: public static final String GRIDSTART_POSTJOB_STATE = "JOB_TERMINATED";
072:
073: /**
074: * The logical site where the job was run.
075: */
076: protected String mSite;
077:
078: /**
079: * The WorkflowMeasurements object created during the callback construction.
080: */
081: protected WorkflowMeasurements mWFMeasurements;
082:
083: /**
084: * The main job whose record is being parsed.
085: */
086: protected String mMainJob;
087:
088: /**
089: * The handle to the logger.
090: */
091: protected LogManager mLogger;
092:
093: /**
094: * Stores all the space readings for the current invocation record.
095: */
096: protected JobMeasurements mJobMeasurements;
097:
098: /**
099: * A Map store that stores JobMeasurements objects indexed by the name of the jobs.
100: */
101: private Map mJMStore;
102:
103: /**
104: * The directory where all the files reside.
105: */
106: private String mDirectory;
107:
108: /**
109: * The number of jobs executing at any given time per site.
110: */
111: private Map mNumJobsStore;
112:
113: /**
114: * The default constructor.
115: */
116: public NodeUsageCallback() {
117: mWFMeasurements = new WorkflowMeasurements();
118: mJMStore = new HashMap();
119: mNumJobsStore = new HashMap();
120: mLogger = LogManager.getInstance();
121: }
122:
123: /**
124: * Initializes the callback.
125: *
126: * @param directory the directory where all the files reside.
127: * @param useStatInfo boolean indicating whether to use stat info or not.
128: */
129: public void initialize(String directory, boolean useStatInfo) {
130: mDirectory = directory;
131: File jobstate = new File(directory, this .JOBSTATE_LOG);
132:
133: //some sanity checks on file
134: if (jobstate.exists()) {
135: if (!jobstate.canRead()) {
136: throw new RuntimeException(
137: "The jobstate file does not exist " + jobstate);
138: }
139: } else {
140: throw new RuntimeException(
141: "Unable to read the jobstate file " + jobstate);
142: }
143:
144: BufferedReader reader;
145:
146: try {
147: reader = new BufferedReader(new FileReader(jobstate));
148:
149: String line, time = null, job = null, state = null, token;
150: int count = 0;
151: StringTokenizer st;
152:
153: while ((line = reader.readLine()) != null) {
154: String site = null;
155: //parse the line contents
156: st = new StringTokenizer(line);
157: count = 1;
158: while (st.hasMoreTokens()) {
159: token = (String) st.nextToken();
160: switch (count) {
161: case 1:
162: time = token;
163: break;
164:
165: case 2:
166: job = token;
167: break;
168:
169: case 3:
170: state = token;
171: break;
172:
173: case 5:
174: site = token;
175: break;
176:
177: default:
178: break;
179: }
180: count++;
181: }
182:
183: if (!validState(state)) {
184: //ignore and move to next line
185: continue;
186: }
187:
188: JobMeasurements js = (mJMStore.containsKey(job)) ? (JobMeasurements) mJMStore
189: .get(job)
190: : new JobMeasurements(job);
191: Date d = new Date(Long.parseLong(time) * 1000);
192: //add date for that particular event
193: js.setTime(d, this .getEventType(state));
194:
195: if (state.equals(this .GRIDSTART_MAINJOB_STATE)) {
196: //increment value
197: int num = this .getCurrentNumOfJobs(site);
198: mNumJobsStore.put(site, new Integer(++num));
199: mWFMeasurements.addMeasurement(site,
200: new NumJobsMeasurement(d, new Integer(num),
201: job));
202: } else if (state.equals(this .GRIDSTART_POSTJOB_STATE)) {
203: //decrement value
204: int num = this .getCurrentNumOfJobs(site);
205: mNumJobsStore.put(site, new Integer(--num));
206: mWFMeasurements.addMeasurement(site,
207: new NumJobsMeasurement(d, new Integer(num),
208: job));
209: }
210:
211: // Space s = new Space( d );
212: // s.setAssociatedJob( job );
213: // js.addSpaceReading( s, this.getEventType( state ));
214:
215: //specific quirk because i am using same trigger for pre job and main job
216: // if( state.equals( this.GRIDSTART_PREJOB_STATE ) ){
217: // //add the same event reading for the main job
218: // js.addSpaceReading( (Space)s.clone(), JobMeasurements.GRIDSTART_MAINJOB_EVENT_TYPE );
219: // }
220:
221: //add the js back
222: mJMStore.put(job, js);
223: }
224: } catch (IOException ioe) {
225: throw new RuntimeException("While reading jobstate file "
226: + jobstate, ioe);
227: }
228:
229: //System.out.println( "Job space store is " + mJMStore );
230:
231: }
232:
233: public void cbInvocationStart(String job, String resource) {
234: mMainJob = job;
235: mSite = resource;
236: mJobMeasurements = new JobMeasurements(job);
237: }
238:
239: public void cbStdIN(List jobs, String data) {
240:
241: }
242:
243: public void cbStdOut(List jobs, String data) {
244: }
245:
246: public void cbStdERR(List jobs, String data) {
247:
248: }
249:
250: /**
251: * Returns the number of jobs that are executing for a particular site
252: *
253: * @param site the name of the site.
254: *
255: * @return
256: */
257: private int getCurrentNumOfJobs(String site) {
258: if (site == null) {
259: throw new RuntimeException("Null site specified");
260: }
261:
262: int value = 0;
263:
264: if (mNumJobsStore.containsKey(site)) {
265: value = ((Integer) mNumJobsStore.get(site)).intValue();
266: } else {
267: mNumJobsStore.put(site, new Integer(value));
268: }
269:
270: return value;
271: }
272:
273: /**
274: * Callback function for when stat information for an input file is
275: * encountered. Empty for time being.
276: *
277: * @param filename the name of the file.
278: * @param info the <code>StatInfo</code> about the file.
279: *
280: */
281: public void cbInputFile(String filename, StatInfo info) {
282: //do nothing
283: }
284:
285: /**
286: * Callback function for when stat information for an output file is
287: * encountered. The size of the file is computed and stored.
288: *
289: * @param filename the name of the file.
290: * @param info the <code>StatInfo</code> about the file.
291: *
292: */
293: public void cbOutputFile(String filename, StatInfo info) {
294: //do nothing
295: }
296:
297: /**
298: * Callback signalling that an invocation record has been parsed.
299: * Stores the total compute size, somewhere in the space structure
300: * for the jobs.
301: *
302: *
303: */
304: public void cbInvocationEnd() {
305:
306: }
307:
308: /**
309: * Returns the SpaceUsage store built.
310: *
311: * @return SpaceUsage
312: */
313: public Object getConstructedObject() {
314: return mWFMeasurements;
315: }
316:
317: /**
318: * Callback signalling that we are done with the parsing of the files.
319: */
320: public void done() {
321: mWFMeasurements.sort();
322: }
323:
324: /**
325: * Returns a boolean indicating whether the state is valid or not.
326: *
327: * @param state the state
328: *
329: * @return boolean
330: */
331: protected boolean validState(String state) {
332: return (state.equals(this .GRIDSTART_MAINJOB_STATE)
333: || state.equals(this .GRIDSTART_POSTJOB_STATE) || state
334: .equals(this .GRIDSTART_PREJOB_STATE));
335: }
336:
337: /**
338: * Returns the event type matching a particular job type
339: *
340: * @param type the state of the job
341: *
342: * @return the corresponding event type
343: */
344: private int getEventType(String state) {
345: int event = -1;
346: if (state.equals(this .GRIDSTART_PREJOB_STATE)) {
347: event = JobMeasurements.GRIDSTART_PREJOB_EVENT_TYPE;
348: } else if (state.equals(this .GRIDSTART_MAINJOB_STATE)) {
349: event = JobMeasurements.GRIDSTART_MAINJOB_EVENT_TYPE;
350: } else if (state.equals(this .GRIDSTART_POSTJOB_STATE)) {
351: event = JobMeasurements.GRIDSTART_POSTJOB_EVENT_TYPE;
352: }
353: return event;
354: }
355:
356: /**
357: * Returns boolean indicating whether the job is a cleanup job or not.
358: * Does it on the basis of the name of the job.
359: *
360: * @param name the name of the job.
361: *
362: * @return boolean
363: */
364: public boolean cleanupJob(String name) {
365: return name.startsWith("cln_");
366: }
367:
368: }
|