001: /*
002: * Copyright 2007 The Kuali Foundation.
003: *
004: * Licensed under the Educational Community License, Version 1.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.opensource.org/licenses/ecl1.php
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.kuali.kfs.batch;
017:
018: import java.util.ArrayList;
019: import java.util.List;
020:
021: import org.apache.log4j.Appender;
022: import org.apache.log4j.Logger;
023: import org.kuali.core.UserSession;
024: import org.kuali.core.exceptions.UserNotFoundException;
025: import org.kuali.core.util.ErrorMap;
026: import org.kuali.core.util.GlobalVariables;
027: import org.kuali.kfs.KFSConstants;
028: import org.kuali.kfs.service.ParameterService;
029: import org.kuali.kfs.service.SchedulerService;
030: import org.quartz.InterruptableJob;
031: import org.quartz.JobExecutionContext;
032: import org.quartz.JobExecutionException;
033: import org.quartz.StatefulJob;
034: import org.quartz.UnableToInterruptJobException;
035:
036: import edu.iu.uis.eden.exception.WorkflowException;
037:
038: public class Job implements StatefulJob, InterruptableJob {
039:
040: public static final String JOB_RUN_START_STEP = "JOB_RUN_START_STEP";
041: public static final String JOB_RUN_END_STEP = "JOB_RUN_END_STEP";
042: private static final String STEP_RUN_PARM_NM = "RUN_IND";
043: private static final String STEP_USER_PARM_NM = "USER";
044: private static final Logger LOG = Logger.getLogger(Job.class);
045: private SchedulerService schedulerService;
046: private ParameterService parameterService;
047: private List<Step> steps;
048: private Step currentStep;
049: private Appender ndcAppender;
050: private boolean notRunnable;
051: private transient Thread workerThread;
052:
053: /**
054: * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
055: */
056: public void execute(JobExecutionContext jobExecutionContext)
057: throws JobExecutionException {
058: workerThread = Thread.currentThread();
059: if (isNotRunnable()) {
060: if (LOG.isInfoEnabled()) {
061: LOG.info("Skipping job because doNotRun is true: "
062: + jobExecutionContext.getJobDetail().getName());
063: }
064: return;
065: }
066: int startStep = 0;
067: try {
068: startStep = Integer.parseInt(jobExecutionContext
069: .getMergedJobDataMap()
070: .getString(JOB_RUN_START_STEP));
071: } catch (NumberFormatException ex) {
072: // not present, do nothing
073: }
074: int endStep = 0;
075: try {
076: endStep = Integer.parseInt(jobExecutionContext
077: .getMergedJobDataMap().getString(JOB_RUN_END_STEP));
078: } catch (NumberFormatException ex) {
079: // not present, do nothing
080: }
081: int currentStepNumber = 0;
082: try {
083: LOG.info("Executing job: "
084: + jobExecutionContext.getJobDetail()
085: + "\n"
086: + jobExecutionContext.getJobDetail()
087: .getJobDataMap());
088: for (Step step : getSteps()) {
089: currentStepNumber++;
090: // prevent starting of the next step if the thread has an interrupted status
091: if (workerThread.isInterrupted()) {
092: LOG
093: .warn("Aborting Job execution due to manual interruption");
094: schedulerService.updateStatus(jobExecutionContext
095: .getJobDetail(),
096: SchedulerService.CANCELLED_JOB_STATUS_CODE);
097: return;
098: }
099: if (startStep > 0 && currentStepNumber < startStep) {
100: if (LOG.isInfoEnabled()) {
101: LOG.info("Skipping step " + currentStepNumber
102: + " - startStep=" + startStep);
103: }
104: continue; // skip to next step
105: } else if (endStep > 0 && currentStepNumber > endStep) {
106: if (LOG.isInfoEnabled()) {
107: LOG
108: .info("Ending step loop - currentStepNumber="
109: + currentStepNumber
110: + " - endStep = " + endStep);
111: }
112: break;
113: }
114: step.setInterrupted(false);
115: try {
116: if (!runStep(parameterService, jobExecutionContext
117: .getJobDetail().getFullName(),
118: currentStepNumber, step)) {
119: break;
120: }
121: } catch (InterruptedException ex) {
122: LOG.warn("Stopping after step interruption");
123: schedulerService.updateStatus(jobExecutionContext
124: .getJobDetail(),
125: SchedulerService.CANCELLED_JOB_STATUS_CODE);
126: return;
127: }
128: if (step.isInterrupted()) {
129: LOG
130: .warn("attempt to interrupt step failed, step continued to completion");
131: LOG
132: .warn("cancelling remainder of job due to step interruption");
133: schedulerService.updateStatus(jobExecutionContext
134: .getJobDetail(),
135: SchedulerService.CANCELLED_JOB_STATUS_CODE);
136: return;
137: }
138: }
139: } catch (Exception e) {
140: schedulerService.updateStatus(jobExecutionContext
141: .getJobDetail(),
142: SchedulerService.FAILED_JOB_STATUS_CODE);
143: throw new JobExecutionException("Caught exception in "
144: + jobExecutionContext.getJobDetail().getName(), e,
145: false);
146: }
147: LOG.info("Finished executing job: "
148: + jobExecutionContext.getJobDetail().getName());
149: schedulerService.updateStatus(jobExecutionContext
150: .getJobDetail(),
151: SchedulerService.SUCCEEDED_JOB_STATUS_CODE);
152: }
153:
154: public static boolean runStep(ParameterService parameterService,
155: String jobName, int currentStepNumber, Step step)
156: throws InterruptedException, UserNotFoundException,
157: WorkflowException {
158: boolean continueJob = true;
159: LOG.info(new StringBuffer("Started processing step: ").append(
160: currentStepNumber).append("=").append(step.getName()));
161: if (parameterService.parameterExists(step.getClass(),
162: STEP_RUN_PARM_NM)
163: && !parameterService.getIndicatorParameter(step
164: .getClass(), STEP_RUN_PARM_NM)) {
165: if (LOG.isInfoEnabled()) {
166: LOG.info("Skipping step due to system parameter: "
167: + STEP_RUN_PARM_NM);
168: }
169: } else {
170: GlobalVariables.setErrorMap(new ErrorMap());
171: GlobalVariables.setMessageList(new ArrayList());
172: String stepUserName = KFSConstants.SYSTEM_USER;
173: if (parameterService.parameterExists(step.getClass(),
174: STEP_USER_PARM_NM)) {
175: stepUserName = parameterService.getParameterValue(step
176: .getClass(), STEP_USER_PARM_NM);
177: }
178: if (LOG.isInfoEnabled()) {
179: LOG.info(new StringBuffer(
180: "Creating user session for step: ").append(
181: step.getName()).append("=")
182: .append(stepUserName));
183: }
184: GlobalVariables
185: .setUserSession(new UserSession(stepUserName));
186: if (LOG.isInfoEnabled()) {
187: LOG.info(new StringBuffer("Executing step: ").append(
188: step.getName()).append("=").append(
189: step.getClass()));
190: }
191: if (!step.execute(jobName)) {
192: continueJob = false;
193: LOG
194: .info("Stopping job after successful step execution");
195: }
196: }
197: LOG.info(new StringBuffer("Finished processing step ").append(
198: currentStepNumber).append(": ").append(step.getName()));
199: return continueJob;
200: }
201:
202: /**
203: * @throws UnableToInterruptJobException
204: */
205: public void interrupt() throws UnableToInterruptJobException {
206: // ask the step to interrupt
207: if (currentStep != null) {
208: currentStep.interrupt();
209: }
210: // also attempt to interrupt the thread, to cause an InterruptedException if the step ever waits or sleeps
211: workerThread.interrupt();
212: }
213:
214: public void setParameterService(ParameterService parameterService) {
215: this .parameterService = parameterService;
216: }
217:
218: public void setSteps(List<Step> steps) {
219: this .steps = steps;
220: }
221:
222: public Appender getNdcAppender() {
223: return ndcAppender;
224: }
225:
226: public void setNdcAppender(Appender ndcAppender) {
227: this .ndcAppender = ndcAppender;
228: }
229:
230: public void setNotRunnable(boolean notRunnable) {
231: this .notRunnable = notRunnable;
232: }
233:
234: protected boolean isNotRunnable() {
235: return notRunnable;
236: }
237:
238: public ParameterService getParameterService() {
239: return parameterService;
240: }
241:
242: public List<Step> getSteps() {
243: return steps;
244: }
245:
246: public void setSchedulerService(SchedulerService schedulerService) {
247: this.schedulerService = schedulerService;
248: }
249: }
|