001: /*
002: * Copyright 2004 Outerthought bvba and Schaubroeck nv
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.outerj.daisy.doctaskrunner.serverimpl;
017:
018: import org.outerj.daisy.doctaskrunner.*;
019: import org.outerj.daisy.doctaskrunner.commonimpl.*;
020: import org.outerj.daisy.repository.Repository;
021: import org.outerj.daisy.repository.VariantKey;
022: import org.outerj.daisy.repository.spi.ExtensionProvider;
023: import org.outerj.daisy.repository.user.Role;
024: import org.outerj.daisy.jdbcutil.JdbcHelper;
025: import org.outerj.daisy.jdbcutil.SqlCounter;
026: import org.outerj.daisy.backuplock.spi.SuspendableProcess;
027: import org.outerj.daisy.plugin.PluginRegistry;
028: import org.apache.avalon.framework.configuration.Configuration;
029: import org.apache.avalon.framework.configuration.ConfigurationException;
030: import org.apache.commons.logging.Log;
031: import org.apache.commons.logging.LogFactory;
032:
033: import javax.sql.DataSource;
034: import javax.annotation.PreDestroy;
035: import java.sql.*;
036: import java.util.*;
037: import java.util.Date;
038: import java.util.concurrent.locks.ReadWriteLock;
039: import java.util.concurrent.locks.ReentrantReadWriteLock;
040: import java.util.concurrent.locks.Lock;
041: import java.util.concurrent.TimeUnit;
042:
043: public class CommonDocumentTaskManager implements SuspendableProcess {
044: private DataSource dataSource;
045: private PluginRegistry pluginRegistry;
046: private JdbcHelper jdbcHelper;
047: private Log log = LogFactory.getLog(getClass());
048: private SqlCounter taskCounter;
049: private final Map<Long, TaskHolder> tasksById = Collections
050: .synchronizedMap(new HashMap<Long, TaskHolder>());
051: private ExtensionProvider extensionProvider = new MyExtensionProvider();
052: private long taskJanitorTaskMaxAge;
053: private long taskJanitorRunInterval;
054: private Thread janitorThread;
055: private ReadWriteLock suspendLock = new ReentrantReadWriteLock(true);
056: private static final String EXTENSION_NAME = "DocumentTaskManager";
057: private static final String SUSPEND_PROCESS_NAME = "Document Task Manager";
058:
059: public CommonDocumentTaskManager(Configuration configuration,
060: DataSource dataSource, PluginRegistry pluginRegistry)
061: throws Exception {
062: this .dataSource = dataSource;
063: this .pluginRegistry = pluginRegistry;
064: configure(configuration);
065: initialize();
066: start();
067: }
068:
069: @PreDestroy
070: public void destroy() throws Exception {
071: stop();
072: dispose();
073: }
074:
075: private void configure(Configuration configuration)
076: throws ConfigurationException {
077: this .taskJanitorTaskMaxAge = configuration.getChild(
078: "taskJanitor").getAttributeAsLong("maxAge");
079: this .taskJanitorRunInterval = configuration.getChild(
080: "taskJanitor").getAttributeAsLong("runInterval");
081: }
082:
083: private void initialize() throws Exception {
084: jdbcHelper = JdbcHelper.getInstance(dataSource, log);
085: taskCounter = new SqlCounter("task_sequence", dataSource, log);
086:
087: markRunningTasksAsInterrupted();
088: pluginRegistry.addPlugin(ExtensionProvider.class,
089: EXTENSION_NAME, extensionProvider);
090:
091: // The document task manager registers itself for suspending its active while
092: // a backup is being taken. Only the actual execution of the scripts will be
093: // blocked while in suspended state, other operations will keep working
094: // (including adding new document tasks).
095: pluginRegistry.addPlugin(SuspendableProcess.class,
096: SUSPEND_PROCESS_NAME, this );
097: }
098:
099: private void dispose() {
100: pluginRegistry.removePlugin(ExtensionProvider.class,
101: EXTENSION_NAME, extensionProvider);
102: pluginRegistry.removePlugin(SuspendableProcess.class,
103: SUSPEND_PROCESS_NAME, this );
104: }
105:
106: private void start() throws Exception {
107: janitorThread = new Thread(new ExpiredTasksJanitor(),
108: "Daisy Expired Document Tasks Janitor");
109: janitorThread.start();
110: }
111:
112: private void stop() throws Exception {
113: // Mark all running tasks as interrupted
114: Collection<TaskHolder> currentTasks;
115: synchronized (tasksById) {
116: currentTasks = new ArrayList<TaskHolder>(tasksById.values());
117: }
118: if (currentTasks.size() > 0) {
119: log.info("Interrupting " + currentTasks.size()
120: + " running document tasks.");
121: for (TaskHolder taskHolder : currentTasks) {
122: log.info("Interrupting document task "
123: + taskHolder.getTaskContext().taskId);
124: taskHolder.getTaskContext().interrupt(
125: TaskState.INTERRUPTED_BY_SHUTDOWN);
126: }
127: }
128:
129: // Stop the janitor thread
130: log.info("Waiting for document task janitor thread to end.");
131: janitorThread.interrupt();
132: try {
133: janitorThread.join();
134: log.info("Document task janitor thread ended.");
135: } catch (InterruptedException e) {
136: // ignore
137: }
138:
139: // Wait for running tasks to end.
140: for (TaskHolder taskHolder : currentTasks) {
141: if (taskHolder.getThread().isAlive()) {
142: log.info("Waiting for document task thread for task "
143: + taskHolder.getTaskContext().taskId
144: + " to end.");
145: try {
146: taskHolder.getThread().join();
147: log.info("Document task thread for task "
148: + taskHolder.getTaskContext().taskId
149: + " ended.");
150: } catch (InterruptedException e) {
151: // ignore
152: }
153: }
154: }
155: }
156:
157: class MyExtensionProvider implements ExtensionProvider {
158: public Object createExtension(Repository repository) {
159: return new DocumentTaskManagerImpl(
160: CommonDocumentTaskManager.this , repository);
161: }
162: }
163:
164: public boolean suspendExecution(long msecs)
165: throws InterruptedException {
166: return suspendLock.writeLock().tryLock(msecs,
167: TimeUnit.MILLISECONDS);
168: }
169:
170: public void resumeExecution() {
171: suspendLock.writeLock().unlock();
172: }
173:
174: private void markRunningTasksAsInterrupted() throws Exception {
175: Connection conn = null;
176: PreparedStatement stmt = null;
177: try {
178: conn = dataSource.getConnection();
179: stmt = conn
180: .prepareStatement("update document_tasks set state = ? where state = ? or state = ?");
181: stmt.setString(1, TaskState.INTERRUPTED_BY_SHUTDOWN
182: .getCode());
183: stmt.setString(2, TaskState.INITIALISING.getCode());
184: stmt.setString(3, TaskState.RUNNING.getCode());
185: int updatedRows = stmt.executeUpdate();
186: if (log.isDebugEnabled())
187: log
188: .debug("Number of tasks marked as 'interrupted by shutdown': "
189: + updatedRows);
190: } catch (Throwable e) {
191: throw new Exception(
192: "Error while marking tasks as 'interrupted by shutdown'",
193: e);
194: } finally {
195: jdbcHelper.closeStatement(stmt);
196: jdbcHelper.closeConnection(conn);
197: }
198: }
199:
200: public long runTask(DocumentSelection documentSelection,
201: TaskSpecification taskSpecification, Repository repository)
202: throws TaskException {
203: long taskId;
204: {
205: Connection conn = null;
206: PreparedStatement stmt = null;
207: try {
208: taskId = taskCounter.getNextId();
209: java.util.Date now = new java.util.Date();
210:
211: conn = dataSource.getConnection();
212: stmt = conn
213: .prepareStatement("insert into document_tasks(id, owner, state, started_at, progress, description, script, scriptlanguage) values(?,?,?,?,?,?,?,?)");
214: stmt.setLong(1, taskId);
215: stmt.setLong(2, repository.getUserId());
216: stmt.setString(3, TaskState.INITIALISING.getCode());
217: stmt.setTimestamp(4, new Timestamp(now.getTime()));
218: stmt.setString(5, "initialising");
219: stmt.setString(6, taskSpecification.getDescription());
220: stmt.setString(7, taskSpecification.getScript());
221: stmt
222: .setString(8, taskSpecification
223: .getScriptLanguage());
224: stmt.execute();
225: } catch (Throwable e) {
226: throw new TaskException("Error inserting task record.",
227: e);
228: } finally {
229: jdbcHelper.closeStatement(stmt);
230: jdbcHelper.closeConnection(conn);
231: }
232: }
233:
234: try {
235: TaskContextImpl taskContext = new TaskContextImpl(taskId);
236: TaskRunner taskRunner = new TaskRunner(documentSelection,
237: taskSpecification, taskContext, repository);
238: Thread thread = new Thread(taskRunner);
239: TaskHolder taskHolder = new TaskHolder(taskRunner,
240: taskContext, repository.getUserId(), thread);
241: tasksById.put(new Long(taskId), taskHolder);
242:
243: thread.start();
244: } catch (Throwable e) {
245: tasksById.remove(new Long(taskId));
246: Connection conn = null;
247: PreparedStatement stmt = null;
248: try {
249: conn = dataSource.getConnection();
250: stmt = conn
251: .prepareStatement("delete from document_tasks where id = ?");
252: stmt.setLong(1, taskId);
253: stmt.execute();
254: } catch (Exception e2) {
255: throw new TaskException(
256: "Problem starting task and problem cleaning up afterwards: "
257: + e.toString(), e2);
258: } finally {
259: jdbcHelper.closeStatement(stmt);
260: jdbcHelper.closeConnection(conn);
261: }
262: throw new TaskException("Problem starting task.", e);
263: }
264:
265: return taskId;
266: }
267:
268: static class TaskHolder {
269: private final TaskRunner taskRunner;
270: private final TaskContextImpl taskContext;
271: private final long ownerId;
272: private final Thread thread;
273:
274: public TaskHolder(TaskRunner taskRunner,
275: TaskContextImpl taskContext, long ownerId, Thread thread) {
276: this .taskRunner = taskRunner;
277: this .taskContext = taskContext;
278: this .ownerId = ownerId;
279: this .thread = thread;
280: }
281:
282: public TaskRunner getTaskRunner() {
283: return taskRunner;
284: }
285:
286: public TaskContextImpl getTaskContext() {
287: return taskContext;
288: }
289:
290: public long getOwnerId() {
291: return ownerId;
292: }
293:
294: public Thread getThread() {
295: return thread;
296: }
297: }
298:
299: class TaskContextImpl implements TaskContext {
300: private TaskState interruptedReason = null;
301: private long taskId;
302:
303: public TaskContextImpl(long taskId) {
304: this .taskId = taskId;
305: }
306:
307: public synchronized void interrupt(TaskState reason) {
308: if (this .interruptedReason == null) {
309: this .interruptedReason = reason;
310: } else {
311: // if already interrupted, silently return
312: }
313: }
314:
315: public boolean isInterrupted() {
316: return interruptedReason != null;
317: }
318:
319: public TaskState getInterruptedReason() {
320: return interruptedReason;
321: }
322:
323: public void setProgress(String progress) {
324: Connection conn = null;
325: PreparedStatement stmt = null;
326: try {
327: conn = dataSource.getConnection();
328: stmt = conn
329: .prepareStatement("update document_tasks set progress = ? where id = ?");
330: stmt.setString(1, progress);
331: stmt.setLong(2, taskId);
332: stmt.execute();
333: } catch (Throwable e) {
334: throw new RuntimeException(
335: "Unexpected error trying to update task progress.",
336: e);
337: } finally {
338: jdbcHelper.closeStatement(stmt);
339: jdbcHelper.closeConnection(conn);
340: }
341: }
342:
343: public void initDocumentResults(VariantKey[] keys,
344: Repository repository) {
345: Connection conn = null;
346: PreparedStatement stmt = null;
347: try {
348: conn = dataSource.getConnection();
349: jdbcHelper.startTransaction(conn);
350:
351: stmt = conn
352: .prepareStatement("insert into task_doc_details(task_id, doc_id, branch_id, lang_id, seqnr, state) values(?,?,?,?,?,?)");
353: stmt.setLong(1, taskId);
354: stmt.setString(6, DocumentExecutionState.WAITING
355: .getCode());
356:
357: for (int i = 0; i < keys.length; i++) {
358: stmt.setString(2, keys[i].getDocumentId());
359: stmt.setLong(3, keys[i].getBranchId());
360: stmt.setLong(4, keys[i].getLanguageId());
361: stmt.setLong(5, i);
362: stmt.execute();
363: }
364: conn.commit();
365: } catch (Throwable e) {
366: jdbcHelper.rollback(conn);
367: throw new RuntimeException(
368: "Unexpected error trying to initialise document states.",
369: e);
370: } finally {
371: jdbcHelper.closeStatement(stmt);
372: jdbcHelper.closeConnection(conn);
373: }
374: }
375:
376: public void setDocumentResult(VariantKey key,
377: DocumentExecutionState state, String details) {
378: Connection conn = null;
379: PreparedStatement stmt = null;
380: try {
381: conn = dataSource.getConnection();
382: stmt = conn
383: .prepareStatement("update task_doc_details set state = ?, details = ? where task_id = ? and doc_id = ? and branch_id = ? and lang_id = ?");
384: stmt.setString(1, state.getCode());
385: stmt.setString(2, details);
386: stmt.setLong(3, taskId);
387: stmt.setString(4, key.getDocumentId());
388: stmt.setLong(5, key.getBranchId());
389: stmt.setLong(6, key.getLanguageId());
390: stmt.execute();
391: } catch (Throwable e) {
392: throw new RuntimeException(
393: "Unexpected error trying to update document state.",
394: e);
395: } finally {
396: jdbcHelper.closeStatement(stmt);
397: jdbcHelper.closeConnection(conn);
398: }
399: }
400:
401: public void setTaskState(TaskState state, String progress,
402: String details) {
403: Connection conn = null;
404: PreparedStatement stmt = null;
405: try {
406: conn = dataSource.getConnection();
407: stmt = conn
408: .prepareStatement("update document_tasks set state = ?, progress = ?, details = ?, finished_at = ? where id = ?");
409: stmt.setString(1, state.getCode());
410: stmt.setString(2, progress);
411: stmt.setString(3, details);
412: stmt.setTimestamp(4,
413: state.isStoppedState() ? new Timestamp(System
414: .currentTimeMillis()) : null);
415: stmt.setLong(5, taskId);
416: stmt.execute();
417: } catch (Throwable e) {
418: throw new RuntimeException(
419: "Unexpected error trying to update task state.",
420: e);
421: } finally {
422: jdbcHelper.closeStatement(stmt);
423: jdbcHelper.closeConnection(conn);
424: }
425: }
426:
427: public void cleanup() {
428: tasksById.remove(new Long(taskId));
429: }
430:
431: public Lock getExecutionLock() {
432: return suspendLock.readLock();
433: }
434: }
435:
436: private static final String SELECT_TASK = "select id, scriptlanguage, owner, started_at, finished_at, state, progress, description, script, details from document_tasks";
437:
438: public Task getTask(long taskId, Repository repository)
439: throws TaskException {
440: Connection conn = null;
441: PreparedStatement stmt = null;
442: try {
443: conn = dataSource.getConnection();
444: stmt = conn.prepareStatement(SELECT_TASK + " where id = ?");
445: stmt.setLong(1, taskId);
446: ResultSet rs = stmt.executeQuery();
447:
448: if (!rs.next())
449: throw new TaskException("No task found with ID "
450: + taskId);
451:
452: if (!repository.isInRole(Role.ADMINISTRATOR)
453: && rs.getLong("owner") != repository.getUserId())
454: throw new TaskException(
455: "Access denied to task with ID " + taskId);
456:
457: return getTaskFromResultSet(rs);
458: } catch (Throwable e) {
459: if (e instanceof TaskException)
460: throw (TaskException) e;
461:
462: throw new TaskException("Error loading task with ID "
463: + taskId, e);
464: } finally {
465: jdbcHelper.closeStatement(stmt);
466: jdbcHelper.closeConnection(conn);
467: }
468: }
469:
470: public Tasks getTasks(Repository repository) throws TaskException {
471: Connection conn = null;
472: PreparedStatement stmt = null;
473: try {
474: conn = dataSource.getConnection();
475: StringBuilder query = new StringBuilder(SELECT_TASK);
476: if (!repository.isInRole(Role.ADMINISTRATOR))
477: query.append(" where owner = ?");
478: stmt = conn.prepareStatement(query.toString());
479: if (!repository.isInRole(Role.ADMINISTRATOR))
480: stmt.setLong(1, repository.getUserId());
481: ResultSet rs = stmt.executeQuery();
482:
483: List<Task> tasks = new ArrayList<Task>();
484:
485: while (rs.next()) {
486: tasks.add(getTaskFromResultSet(rs));
487: }
488:
489: return new TasksImpl(tasks.toArray(new Task[tasks.size()]));
490: } catch (Throwable e) {
491: throw new TaskException("Error loading tasks.", e);
492: } finally {
493: jdbcHelper.closeStatement(stmt);
494: jdbcHelper.closeConnection(conn);
495: }
496: }
497:
498: private TaskImpl getTaskFromResultSet(ResultSet rs)
499: throws SQLException {
500: long taskId = rs.getLong("id");
501: String description = rs.getString("description");
502: TaskState state = TaskState.getByCode(rs.getString("state"));
503: long ownerId = rs.getLong("owner");
504: String progress = rs.getString("progress");
505: String details = rs.getString("details");
506: String script = rs.getString("script");
507: String scriptLanguage = rs.getString("scriptlanguage");
508: Date startedAt = rs.getTimestamp("started_at");
509: Date finishedAt = rs.getTimestamp("finished_at");
510:
511: TaskImpl task = new TaskImpl(taskId, description, state,
512: ownerId, progress, details, script, scriptLanguage,
513: startedAt, finishedAt);
514:
515: return task;
516: }
517:
518: public void deleteTask(long taskId, Repository repository)
519: throws TaskException {
520: Connection conn = null;
521: PreparedStatement stmt = null;
522: try {
523: conn = dataSource.getConnection();
524: jdbcHelper.startTransaction(conn);
525:
526: stmt = conn
527: .prepareStatement("select state, owner from document_tasks where id = ? "
528: + jdbcHelper.getSharedLockClause());
529: stmt.setLong(1, taskId);
530: ResultSet rs = stmt.executeQuery();
531:
532: if (!rs.next())
533: throw new TaskException("No task found with ID "
534: + taskId);
535:
536: if (!repository.isInRole(Role.ADMINISTRATOR)
537: && rs.getLong("owner") != repository.getUserId())
538: throw new TaskException(
539: "Access denied to task with ID " + taskId);
540:
541: TaskState state = TaskState
542: .getByCode(rs.getString("state"));
543: if (state == TaskState.INITIALISING
544: || state == TaskState.RUNNING)
545: throw new TaskException("Cannot delete task with ID "
546: + taskId + " because it has not yet ended.");
547:
548: stmt.close();
549:
550: deleteTask(taskId, conn);
551:
552: conn.commit();
553: } catch (Throwable e) {
554: jdbcHelper.rollback(conn);
555: if (e instanceof TaskException)
556: throw (TaskException) e;
557:
558: throw new TaskException("Problem deleting task with ID "
559: + taskId, e);
560: } finally {
561: jdbcHelper.closeStatement(stmt);
562: jdbcHelper.closeConnection(conn);
563: }
564: }
565:
566: /**
567: * Pefroms actual deletion of task, assumes necessary locks are taken and transaction is started.
568: */
569: private void deleteTask(long taskId, Connection conn)
570: throws SQLException {
571: PreparedStatement stmt = null;
572: try {
573: stmt = conn
574: .prepareStatement("delete from task_doc_details where task_id = ?");
575: stmt.setLong(1, taskId);
576: stmt.execute();
577: stmt.close();
578:
579: stmt = conn
580: .prepareStatement("delete from document_tasks where id = ?");
581: stmt.setLong(1, taskId);
582: stmt.execute();
583: stmt.close();
584: } finally {
585: jdbcHelper.closeStatement(stmt);
586: }
587:
588: }
589:
590: public void interruptTask(long taskId, Repository repository)
591: throws TaskException {
592: Long taskKey = new Long(taskId);
593: TaskHolder taskHolder = tasksById.get(taskKey);
594:
595: if (taskHolder == null)
596: throw new TaskException("There is no task running with ID "
597: + taskId);
598:
599: if (!repository.isInRole(Role.ADMINISTRATOR)
600: && taskHolder.getOwnerId() != repository.getUserId())
601: throw new TaskException(
602: "You are not allowed to interrupt the task with ID "
603: + taskId);
604:
605: taskHolder.getTaskContext().interrupt(
606: TaskState.INTERRUPTED_BY_USER);
607: }
608:
609: public TaskDocDetails getTaskDocDetails(long taskId,
610: Repository repository) throws TaskException {
611: // Do a call to getTask so that existence and access permissions are verified
612: getTask(taskId, repository);
613:
614: Connection conn = null;
615: PreparedStatement stmt = null;
616: try {
617: conn = dataSource.getConnection();
618: stmt = conn
619: .prepareStatement("select doc_id, branch_id, lang_id, state, details from task_doc_details where task_id = ? order by seqnr");
620: stmt.setLong(1, taskId);
621: ResultSet rs = stmt.executeQuery();
622:
623: List<TaskDocDetail> taskDocDetails = new ArrayList<TaskDocDetail>();
624: while (rs.next()) {
625: VariantKey variantKey = new VariantKey(rs
626: .getString("doc_id"), rs.getLong("branch_id"),
627: rs.getLong("lang_id"));
628: DocumentExecutionState state = DocumentExecutionState
629: .getByCode(rs.getString("state"));
630: String details = rs.getString("details");
631: taskDocDetails.add(new TaskDocDetailImpl(variantKey,
632: state, details));
633: }
634:
635: return new TaskDocDetailsImpl(taskDocDetails
636: .toArray(new TaskDocDetail[taskDocDetails.size()]));
637: } catch (Throwable e) {
638: throw new TaskException(
639: "Error retrieving task document details for task "
640: + taskId, e);
641: } finally {
642: jdbcHelper.closeStatement(stmt);
643: jdbcHelper.closeConnection(conn);
644: }
645: }
646:
647: class ExpiredTasksJanitor implements Runnable {
648: public void run() {
649: try {
650: while (true) {
651: if (Thread.interrupted())
652: return;
653:
654: Thread.sleep(taskJanitorRunInterval);
655:
656: Connection conn = null;
657: PreparedStatement stmt = null;
658: try {
659: conn = dataSource.getConnection();
660: jdbcHelper.startTransaction(conn);
661:
662: // Note: the search is performed on started_at and not on finished_at because finished_at may
663: // not always have a value (e.g. when the task was interrupted by shutdown)
664: stmt = conn
665: .prepareStatement("select id from document_tasks where started_at < ? and state not in ('"
666: + TaskState.INITIALISING
667: .getCode()
668: + "', '"
669: + TaskState.RUNNING.getCode()
670: + "') "
671: + jdbcHelper
672: .getSharedLockClause());
673: stmt.setTimestamp(1, new Timestamp(System
674: .currentTimeMillis()
675: - taskJanitorTaskMaxAge));
676: ResultSet rs = stmt.executeQuery();
677: List<Long> taskIds = new ArrayList<Long>();
678: while (rs.next()) {
679: taskIds.add(new Long(rs.getLong(1)));
680: }
681: stmt.close();
682:
683: for (Long taskId : taskIds) {
684: deleteTask(taskId, conn);
685: }
686:
687: conn.commit();
688: } catch (Throwable e) {
689: jdbcHelper.rollback(conn);
690: log
691: .error(
692: "Expired tasks janitor: error while performing my job.",
693: e);
694: } finally {
695: jdbcHelper.closeStatement(stmt);
696: jdbcHelper.closeConnection(conn);
697: }
698: }
699: } catch (InterruptedException e) {
700: // ignore
701: } finally {
702: log
703: .debug("Expired document task janitor thread ended.");
704: }
705: }
706: }
707:
708: }
|