0001: /*
0002: * Copyright 2004-2005 OpenSymphony
0003: *
0004: * Licensed under the Apache License, Version 2.0 (the "License"); you may not
0005: * use this file except in compliance with the License. You may obtain a copy
0006: * of the License at
0007: *
0008: * http://www.apache.org/licenses/LICENSE-2.0
0009: *
0010: * Unless required by applicable law or agreed to in writing, software
0011: * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
0012: * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
0013: * License for the specific language governing permissions and limitations
0014: * under the License.
0015: *
0016: */
0017:
0018: /*
0019: * Previously Copyright (c) 2001-2004 James House
0020: */
0021: package org.quartz.impl.jdbcjobstore;
0022:
0023: import java.io.IOException;
0024: import java.lang.reflect.Constructor;
0025: import java.lang.reflect.InvocationHandler;
0026: import java.lang.reflect.InvocationTargetException;
0027: import java.lang.reflect.Proxy;
0028: import java.sql.Connection;
0029: import java.sql.SQLException;
0030: import java.util.ArrayList;
0031: import java.util.Date;
0032: import java.util.HashMap;
0033: import java.util.HashSet;
0034: import java.util.Iterator;
0035: import java.util.LinkedList;
0036: import java.util.List;
0037: import java.util.Set;
0038:
0039: import org.apache.commons.logging.Log;
0040: import org.apache.commons.logging.LogFactory;
0041: import org.quartz.Calendar;
0042: import org.quartz.CronTrigger;
0043: import org.quartz.JobDataMap;
0044: import org.quartz.JobDetail;
0045: import org.quartz.JobPersistenceException;
0046: import org.quartz.ObjectAlreadyExistsException;
0047: import org.quartz.Scheduler;
0048: import org.quartz.SchedulerConfigException;
0049: import org.quartz.SchedulerException;
0050: import org.quartz.SimpleTrigger;
0051: import org.quartz.Trigger;
0052: import org.quartz.core.SchedulingContext;
0053: import org.quartz.spi.ClassLoadHelper;
0054: import org.quartz.spi.JobStore;
0055: import org.quartz.spi.SchedulerSignaler;
0056: import org.quartz.spi.TriggerFiredBundle;
0057: import org.quartz.utils.DBConnectionManager;
0058: import org.quartz.utils.Key;
0059: import org.quartz.utils.TriggerStatus;
0060:
0061: /**
0062: * <p>
0063: * Contains base functionality for JDBC-based JobStore implementations.
0064: * </p>
0065: *
0066: * @author <a href="mailto:jeff@binaryfeed.org">Jeffrey Wescott</a>
0067: * @author James House
0068: */
0069: public abstract class JobStoreSupport implements JobStore, Constants {
0070:
0071: /*
0072: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0073: *
0074: * Constants.
0075: *
0076: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0077: */
0078:
0079: protected static String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS";
0080:
0081: protected static String LOCK_JOB_ACCESS = "JOB_ACCESS";
0082:
0083: protected static String LOCK_CALENDAR_ACCESS = "CALENDAR_ACCESS";
0084:
0085: protected static String LOCK_STATE_ACCESS = "STATE_ACCESS";
0086:
0087: protected static String LOCK_MISFIRE_ACCESS = "MISFIRE_ACCESS";
0088:
0089: /*
0090: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0091: *
0092: * Data members.
0093: *
0094: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0095: */
0096:
0097: protected String dsName;
0098:
0099: protected String tablePrefix = DEFAULT_TABLE_PREFIX;
0100:
0101: protected boolean useProperties = false;
0102:
0103: protected String instanceId;
0104:
0105: protected String instanceName;
0106:
0107: protected String delegateClassName;
0108: protected Class delegateClass = StdJDBCDelegate.class;
0109:
0110: protected HashMap calendarCache = new HashMap();
0111:
0112: private DriverDelegate delegate;
0113:
0114: private long misfireThreshold = 60000L; // one minute
0115:
0116: private boolean dontSetAutoCommitFalse = false;
0117:
0118: private boolean isClustered = false;
0119:
0120: private boolean useDBLocks = false;
0121:
0122: private boolean lockOnInsert = true;
0123:
0124: private Semaphore lockHandler = null; // set in initialize() method...
0125:
0126: private String selectWithLockSQL = null;
0127:
0128: private long clusterCheckinInterval = 7500L;
0129:
0130: private ClusterManager clusterManagementThread = null;
0131:
0132: private MisfireHandler misfireHandler = null;
0133:
0134: private ClassLoadHelper classLoadHelper;
0135:
0136: private SchedulerSignaler signaler;
0137:
0138: protected int maxToRecoverAtATime = 20;
0139:
0140: private boolean setTxIsolationLevelSequential = false;
0141:
0142: private long dbRetryInterval = 10000;
0143:
0144: private boolean makeThreadsDaemons = false;
0145:
0146: private boolean doubleCheckLockMisfireHandler = true;
0147:
0148: private final Log log = LogFactory.getLog(getClass());
0149:
0150: /*
0151: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0152: *
0153: * Interface.
0154: *
0155: * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0156: */
0157:
0158: /**
0159: * <p>
0160: * Set the name of the <code>DataSource</code> that should be used for
0161: * performing database functions.
0162: * </p>
0163: */
0164: public void setDataSource(String dsName) {
0165: this .dsName = dsName;
0166: }
0167:
0168: /**
0169: * <p>
0170: * Get the name of the <code>DataSource</code> that should be used for
0171: * performing database functions.
0172: * </p>
0173: */
0174: public String getDataSource() {
0175: return dsName;
0176: }
0177:
0178: /**
0179: * <p>
0180: * Set the prefix that should be pre-pended to all table names.
0181: * </p>
0182: */
0183: public void setTablePrefix(String prefix) {
0184: if (prefix == null) {
0185: prefix = "";
0186: }
0187:
0188: this .tablePrefix = prefix;
0189: }
0190:
0191: /**
0192: * <p>
0193: * Get the prefix that should be pre-pended to all table names.
0194: * </p>
0195: */
0196: public String getTablePrefix() {
0197: return tablePrefix;
0198: }
0199:
0200: /**
0201: * <p>
0202: * Set whether String-only properties will be handled in JobDataMaps.
0203: * </p>
0204: */
0205: public void setUseProperties(String useProp) {
0206: if (useProp == null) {
0207: useProp = "false";
0208: }
0209:
0210: this .useProperties = Boolean.valueOf(useProp).booleanValue();
0211: }
0212:
0213: /**
0214: * <p>
0215: * Get whether String-only properties will be handled in JobDataMaps.
0216: * </p>
0217: */
0218: public boolean canUseProperties() {
0219: return useProperties;
0220: }
0221:
0222: /**
0223: * <p>
0224: * Set the instance Id of the Scheduler (must be unique within a cluster).
0225: * </p>
0226: */
0227: public void setInstanceId(String instanceId) {
0228: this .instanceId = instanceId;
0229: }
0230:
0231: /**
0232: * <p>
0233: * Get the instance Id of the Scheduler (must be unique within a cluster).
0234: * </p>
0235: */
0236: public String getInstanceId() {
0237:
0238: return instanceId;
0239: }
0240:
0241: /**
0242: * Set the instance name of the Scheduler (must be unique within this server instance).
0243: */
0244: public void setInstanceName(String instanceName) {
0245: this .instanceName = instanceName;
0246: }
0247:
0248: /**
0249: * Get the instance name of the Scheduler (must be unique within this server instance).
0250: */
0251: public String getInstanceName() {
0252:
0253: return instanceName;
0254: }
0255:
0256: /**
0257: * <p>
0258: * Set whether this instance is part of a cluster.
0259: * </p>
0260: */
0261: public void setIsClustered(boolean isClustered) {
0262: this .isClustered = isClustered;
0263: }
0264:
0265: /**
0266: * <p>
0267: * Get whether this instance is part of a cluster.
0268: * </p>
0269: */
0270: public boolean isClustered() {
0271: return isClustered;
0272: }
0273:
0274: /**
0275: * <p>
0276: * Get the frequency (in milliseconds) at which this instance "checks-in"
0277: * with the other instances of the cluster. -- Affects the rate of
0278: * detecting failed instances.
0279: * </p>
0280: */
0281: public long getClusterCheckinInterval() {
0282: return clusterCheckinInterval;
0283: }
0284:
0285: /**
0286: * <p>
0287: * Set the frequency (in milliseconds) at which this instance "checks-in"
0288: * with the other instances of the cluster. -- Affects the rate of
0289: * detecting failed instances.
0290: * </p>
0291: */
0292: public void setClusterCheckinInterval(long l) {
0293: clusterCheckinInterval = l;
0294: }
0295:
0296: /**
0297: * <p>
0298: * Get the maximum number of misfired triggers that the misfire handling
0299: * thread will try to recover at one time (within one transaction). The
0300: * default is 20.
0301: * </p>
0302: */
0303: public int getMaxMisfiresToHandleAtATime() {
0304: return maxToRecoverAtATime;
0305: }
0306:
0307: /**
0308: * <p>
0309: * Set the maximum number of misfired triggers that the misfire handling
0310: * thread will try to recover at one time (within one transaction). The
0311: * default is 20.
0312: * </p>
0313: */
0314: public void setMaxMisfiresToHandleAtATime(int maxToRecoverAtATime) {
0315: this .maxToRecoverAtATime = maxToRecoverAtATime;
0316: }
0317:
0318: /**
0319: * @return Returns the dbRetryInterval.
0320: */
0321: public long getDbRetryInterval() {
0322: return dbRetryInterval;
0323: }
0324:
0325: /**
0326: * @param dbRetryInterval The dbRetryInterval to set.
0327: */
0328: public void setDbRetryInterval(long dbRetryInterval) {
0329: this .dbRetryInterval = dbRetryInterval;
0330: }
0331:
0332: /**
0333: * <p>
0334: * Set whether this instance should use database-based thread
0335: * synchronization.
0336: * </p>
0337: */
0338: public void setUseDBLocks(boolean useDBLocks) {
0339: this .useDBLocks = useDBLocks;
0340: }
0341:
0342: /**
0343: * <p>
0344: * Get whether this instance should use database-based thread
0345: * synchronization.
0346: * </p>
0347: */
0348: public boolean getUseDBLocks() {
0349: return useDBLocks;
0350: }
0351:
0352: public boolean isLockOnInsert() {
0353: return lockOnInsert;
0354: }
0355:
0356: /**
0357: * Whether or not to obtain locks when inserting new jobs/triggers.
0358: * Defaults to <code>true</code>, which is safest - some db's (such as
0359: * MS SQLServer) seem to require this to avoid deadlocks under high load,
0360: * while others seem to do fine without.
0361: *
0362: * <p>Setting this property to <code>false</code> will provide a
0363: * significant performance increase during the addition of new jobs
0364: * and triggers.</p>
0365: *
0366: * @param lockOnInsert
0367: */
0368: public void setLockOnInsert(boolean lockOnInsert) {
0369: this .lockOnInsert = lockOnInsert;
0370: }
0371:
0372: public long getMisfireThreshold() {
0373: return misfireThreshold;
0374: }
0375:
0376: /**
0377: * The the number of milliseconds by which a trigger must have missed its
0378: * next-fire-time, in order for it to be considered "misfired" and thus
0379: * have its misfire instruction applied.
0380: *
0381: * @param misfireThreshold
0382: */
0383: public void setMisfireThreshold(long misfireThreshold) {
0384: if (misfireThreshold < 1) {
0385: throw new IllegalArgumentException(
0386: "Misfirethreshold must be larger than 0");
0387: }
0388: this .misfireThreshold = misfireThreshold;
0389: }
0390:
0391: public boolean isDontSetAutoCommitFalse() {
0392: return dontSetAutoCommitFalse;
0393: }
0394:
0395: /**
0396: * Don't call set autocommit(false) on connections obtained from the
0397: * DataSource. This can be helpfull in a few situations, such as if you
0398: * have a driver that complains if it is called when it is already off.
0399: *
0400: * @param b
0401: */
0402: public void setDontSetAutoCommitFalse(boolean b) {
0403: dontSetAutoCommitFalse = b;
0404: }
0405:
0406: public boolean isTxIsolationLevelSerializable() {
0407: return setTxIsolationLevelSequential;
0408: }
0409:
0410: /**
0411: * Set the transaction isolation level of DB connections to sequential.
0412: *
0413: * @param b
0414: */
0415: public void setTxIsolationLevelSerializable(boolean b) {
0416: setTxIsolationLevelSequential = b;
0417: }
0418:
0419: /**
0420: * <p>
0421: * Set the JDBC driver delegate class.
0422: * </p>
0423: *
0424: * @param delegateClassName
0425: * the delegate class name
0426: */
0427: public void setDriverDelegateClass(String delegateClassName)
0428: throws InvalidConfigurationException {
0429: this .delegateClassName = delegateClassName;
0430: }
0431:
0432: /**
0433: * <p>
0434: * Get the JDBC driver delegate class name.
0435: * </p>
0436: *
0437: * @return the delegate class name
0438: */
0439: public String getDriverDelegateClass() {
0440: return delegateClassName;
0441: }
0442:
0443: public String getSelectWithLockSQL() {
0444: return selectWithLockSQL;
0445: }
0446:
0447: /**
0448: * <p>
0449: * set the SQL statement to use to select and lock a row in the "locks"
0450: * table.
0451: * </p>
0452: *
0453: * @see StdRowLockSemaphore
0454: */
0455: public void setSelectWithLockSQL(String string) {
0456: selectWithLockSQL = string;
0457: }
0458:
0459: protected ClassLoadHelper getClassLoadHelper() {
0460: return classLoadHelper;
0461: }
0462:
0463: /**
0464: * Get whether the threads spawned by this JobStore should be
0465: * marked as daemon. Possible threads include the <code>MisfireHandler</code>
0466: * and the <code>ClusterManager</code>.
0467: *
0468: * @see Thread#setDaemon(boolean)
0469: */
0470: public boolean getMakeThreadsDaemons() {
0471: return makeThreadsDaemons;
0472: }
0473:
0474: /**
0475: * Set whether the threads spawned by this JobStore should be
0476: * marked as daemon. Possible threads include the <code>MisfireHandler</code>
0477: * and the <code>ClusterManager</code>.
0478: *
0479: * @see Thread#setDaemon(boolean)
0480: */
0481: public void setMakeThreadsDaemons(boolean makeThreadsDaemons) {
0482: this .makeThreadsDaemons = makeThreadsDaemons;
0483: }
0484:
0485: /**
0486: * Get whether to check to see if there are Triggers that have misfired
0487: * before actually acquiring the lock to recover them. This should be
0488: * set to false if the majority of the time, there are are misfired
0489: * Triggers.
0490: */
0491: public boolean getDoubleCheckLockMisfireHandler() {
0492: return doubleCheckLockMisfireHandler;
0493: }
0494:
0495: /**
0496: * Set whether to check to see if there are Triggers that have misfired
0497: * before actually acquiring the lock to recover them. This should be
0498: * set to false if the majority of the time, there are are misfired
0499: * Triggers.
0500: */
0501: public void setDoubleCheckLockMisfireHandler(
0502: boolean doubleCheckLockMisfireHandler) {
0503: this .doubleCheckLockMisfireHandler = doubleCheckLockMisfireHandler;
0504: }
0505:
0506: //---------------------------------------------------------------------------
0507: // interface methods
0508: //---------------------------------------------------------------------------
0509:
0510: protected Log getLog() {
0511: return log;
0512: }
0513:
0514: /**
0515: * <p>
0516: * Called by the QuartzScheduler before the <code>JobStore</code> is
0517: * used, in order to give it a chance to initialize.
0518: * </p>
0519: */
0520: public void initialize(ClassLoadHelper loadHelper,
0521: SchedulerSignaler signaler) throws SchedulerConfigException {
0522:
0523: if (dsName == null) {
0524: throw new SchedulerConfigException(
0525: "DataSource name not set.");
0526: }
0527:
0528: classLoadHelper = loadHelper;
0529: this .signaler = signaler;
0530:
0531: // If the user hasn't specified an explicit lock handler, then
0532: // choose one based on CMT/Clustered/UseDBLocks.
0533: if (getLockHandler() == null) {
0534:
0535: // If the user hasn't specified an explicit lock handler,
0536: // then we *must* use DB locks with clustering
0537: if (isClustered()) {
0538: setUseDBLocks(true);
0539: }
0540:
0541: if (getUseDBLocks()) {
0542: getLog()
0543: .info(
0544: "Using db table-based data access locking (synchronization).");
0545: setLockHandler(new StdRowLockSemaphore(
0546: getTablePrefix(), getSelectWithLockSQL()));
0547: } else {
0548: getLog()
0549: .info(
0550: "Using thread monitor-based data access locking (synchronization).");
0551: setLockHandler(new SimpleSemaphore());
0552: }
0553: }
0554:
0555: if (!isClustered()) {
0556: try {
0557: cleanVolatileTriggerAndJobs();
0558: } catch (SchedulerException se) {
0559: throw new SchedulerConfigException(
0560: "Failure occured during job recovery.", se);
0561: }
0562: }
0563: }
0564:
0565: /**
0566: * @see org.quartz.spi.JobStore#schedulerStarted()
0567: */
0568: public void schedulerStarted() throws SchedulerException {
0569:
0570: if (isClustered()) {
0571: clusterManagementThread = new ClusterManager();
0572: clusterManagementThread.initialize();
0573: } else {
0574: try {
0575: recoverJobs();
0576: } catch (SchedulerException se) {
0577: throw new SchedulerConfigException(
0578: "Failure occured during job recovery.", se);
0579: }
0580: }
0581:
0582: misfireHandler = new MisfireHandler();
0583: misfireHandler.initialize();
0584: }
0585:
0586: /**
0587: * <p>
0588: * Called by the QuartzScheduler to inform the <code>JobStore</code> that
0589: * it should free up all of it's resources because the scheduler is
0590: * shutting down.
0591: * </p>
0592: */
0593: public void shutdown() {
0594: if (clusterManagementThread != null) {
0595: clusterManagementThread.shutdown();
0596: }
0597:
0598: if (misfireHandler != null) {
0599: misfireHandler.shutdown();
0600: }
0601:
0602: try {
0603: DBConnectionManager.getInstance().shutdown(getDataSource());
0604: } catch (SQLException sqle) {
0605: getLog().warn("Database connection shutdown unsuccessful.",
0606: sqle);
0607: }
0608: }
0609:
0610: public boolean supportsPersistence() {
0611: return true;
0612: }
0613:
0614: //---------------------------------------------------------------------------
0615: // helper methods for subclasses
0616: //---------------------------------------------------------------------------
0617:
0618: protected abstract Connection getNonManagedTXConnection()
0619: throws JobPersistenceException;
0620:
0621: /**
0622: * Wrap the given <code>Connection</code> in a Proxy such that attributes
0623: * that might be set will be restored before the connection is closed
0624: * (and potentially restored to a pool).
0625: */
0626: protected Connection getAttributeRestoringConnection(Connection conn) {
0627: return (Connection) Proxy
0628: .newProxyInstance(
0629: Thread.currentThread().getContextClassLoader(),
0630: new Class[] { Connection.class },
0631: new AttributeRestoringConnectionInvocationHandler(
0632: conn));
0633: }
0634:
0635: protected Connection getConnection() throws JobPersistenceException {
0636: Connection conn = null;
0637: try {
0638: conn = DBConnectionManager.getInstance().getConnection(
0639: getDataSource());
0640: } catch (SQLException sqle) {
0641: throw new JobPersistenceException(
0642: "Failed to obtain DB connection from data source '"
0643: + getDataSource() + "': " + sqle.toString(),
0644: sqle);
0645: } catch (Throwable e) {
0646: throw new JobPersistenceException(
0647: "Failed to obtain DB connection from data source '"
0648: + getDataSource() + "': " + e.toString(),
0649: e,
0650: JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE);
0651: }
0652:
0653: if (conn == null) {
0654: throw new JobPersistenceException(
0655: "Could not get connection from DataSource '"
0656: + getDataSource() + "'");
0657: }
0658:
0659: // Protect connection attributes we might change.
0660: conn = getAttributeRestoringConnection(conn);
0661:
0662: // Set any connection connection attributes we are to override.
0663: try {
0664: if (!isDontSetAutoCommitFalse()) {
0665: conn.setAutoCommit(false);
0666: }
0667:
0668: if (isTxIsolationLevelSerializable()) {
0669: conn
0670: .setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
0671: }
0672: } catch (SQLException sqle) {
0673: getLog()
0674: .warn(
0675: "Failed to override connection auto commit/transaction isolation.",
0676: sqle);
0677: } catch (Throwable e) {
0678: try {
0679: conn.close();
0680: } catch (Throwable tt) {
0681: }
0682:
0683: throw new JobPersistenceException(
0684: "Failure setting up connection.", e);
0685: }
0686:
0687: return conn;
0688: }
0689:
0690: protected void releaseLock(Connection conn, String lockName,
0691: boolean doIt) {
0692: if (doIt && conn != null) {
0693: try {
0694: getLockHandler().releaseLock(conn, lockName);
0695: } catch (LockException le) {
0696: getLog().error(
0697: "Error returning lock: " + le.getMessage(), le);
0698: }
0699: }
0700: }
0701:
0702: /**
0703: * Removes all volatile data.
0704: *
0705: * @throws JobPersistenceException If jobs could not be recovered.
0706: */
0707: protected void cleanVolatileTriggerAndJobs()
0708: throws JobPersistenceException {
0709: executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
0710: new VoidTransactionCallback() {
0711: public void execute(Connection conn)
0712: throws JobPersistenceException {
0713: cleanVolatileTriggerAndJobs(conn);
0714: }
0715: });
0716: }
0717:
0718: /**
0719: * <p>
0720: * Removes all volatile data.
0721: * </p>
0722: *
0723: * @throws JobPersistenceException
0724: * if jobs could not be recovered
0725: */
0726: protected void cleanVolatileTriggerAndJobs(Connection conn)
0727: throws JobPersistenceException {
0728: try {
0729: // find volatile jobs & triggers...
0730: Key[] volatileTriggers = getDelegate()
0731: .selectVolatileTriggers(conn);
0732: Key[] volatileJobs = getDelegate().selectVolatileJobs(conn);
0733:
0734: for (int i = 0; i < volatileTriggers.length; i++) {
0735: removeTrigger(conn, null,
0736: volatileTriggers[i].getName(),
0737: volatileTriggers[i].getGroup());
0738: }
0739: getLog().info(
0740: "Removed " + volatileTriggers.length
0741: + " Volatile Trigger(s).");
0742:
0743: for (int i = 0; i < volatileJobs.length; i++) {
0744: removeJob(conn, null, volatileJobs[i].getName(),
0745: volatileJobs[i].getGroup(), true);
0746: }
0747: getLog().info(
0748: "Removed " + volatileJobs.length
0749: + " Volatile Job(s).");
0750:
0751: // clean up any fired trigger entries
0752: getDelegate().deleteVolatileFiredTriggers(conn);
0753:
0754: } catch (Exception e) {
0755: throw new JobPersistenceException(
0756: "Couldn't clean volatile data: " + e.getMessage(),
0757: e);
0758: }
0759: }
0760:
0761: /**
0762: * Recover any failed or misfired jobs and clean up the data store as
0763: * appropriate.
0764: *
0765: * @throws JobPersistenceException if jobs could not be recovered
0766: */
0767: protected void recoverJobs() throws JobPersistenceException {
0768: executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
0769: new VoidTransactionCallback() {
0770: public void execute(Connection conn)
0771: throws JobPersistenceException {
0772: recoverJobs(conn);
0773: }
0774: });
0775: }
0776:
0777: /**
0778: * <p>
0779: * Will recover any failed or misfired jobs and clean up the data store as
0780: * appropriate.
0781: * </p>
0782: *
0783: * @throws JobPersistenceException
0784: * if jobs could not be recovered
0785: */
0786: protected void recoverJobs(Connection conn)
0787: throws JobPersistenceException {
0788: try {
0789: // update inconsistent job states
0790: int rows = getDelegate()
0791: .updateTriggerStatesFromOtherStates(conn,
0792: STATE_WAITING, STATE_ACQUIRED,
0793: STATE_BLOCKED);
0794:
0795: rows += getDelegate().updateTriggerStatesFromOtherStates(
0796: conn, STATE_PAUSED, STATE_PAUSED_BLOCKED,
0797: STATE_PAUSED_BLOCKED);
0798:
0799: getLog()
0800: .info(
0801: "Freed "
0802: + rows
0803: + " triggers from 'acquired' / 'blocked' state.");
0804:
0805: // clean up misfired jobs
0806: recoverMisfiredJobs(conn, true);
0807:
0808: // recover jobs marked for recovery that were not fully executed
0809: Trigger[] recoveringJobTriggers = getDelegate()
0810: .selectTriggersForRecoveringJobs(conn);
0811: getLog()
0812: .info(
0813: "Recovering "
0814: + recoveringJobTriggers.length
0815: + " jobs that were in-progress at the time of the last shut-down.");
0816:
0817: for (int i = 0; i < recoveringJobTriggers.length; ++i) {
0818: if (jobExists(conn, recoveringJobTriggers[i]
0819: .getJobName(), recoveringJobTriggers[i]
0820: .getJobGroup())) {
0821: recoveringJobTriggers[i].computeFirstFireTime(null);
0822: storeTrigger(conn, null, recoveringJobTriggers[i],
0823: null, false, STATE_WAITING, false, true);
0824: }
0825: }
0826: getLog().info("Recovery complete.");
0827:
0828: // remove lingering 'complete' triggers...
0829: Key[] ct = getDelegate().selectTriggersInState(conn,
0830: STATE_COMPLETE);
0831: for (int i = 0; ct != null && i < ct.length; i++) {
0832: removeTrigger(conn, null, ct[i].getName(), ct[i]
0833: .getGroup());
0834: }
0835: getLog().info(
0836: "Removed " + ct.length + " 'complete' triggers.");
0837:
0838: // clean up any fired trigger entries
0839: int n = getDelegate().deleteFiredTriggers(conn);
0840: getLog().info("Removed " + n + " stale fired job entries.");
0841: } catch (JobPersistenceException e) {
0842: throw e;
0843: } catch (Exception e) {
0844: throw new JobPersistenceException("Couldn't recover jobs: "
0845: + e.getMessage(), e);
0846: }
0847: }
0848:
0849: protected long getMisfireTime() {
0850: long misfireTime = System.currentTimeMillis();
0851: if (getMisfireThreshold() > 0) {
0852: misfireTime -= getMisfireThreshold();
0853: }
0854:
0855: return (misfireTime > 0) ? misfireTime : 0;
0856: }
0857:
0858: /**
0859: * Helper class for returning the composite result of trying
0860: * to recover misfired jobs.
0861: */
0862: protected static class RecoverMisfiredJobsResult {
0863: public static final RecoverMisfiredJobsResult NO_OP = new RecoverMisfiredJobsResult(
0864: false, 0);
0865:
0866: private boolean _hasMoreMisfiredTriggers;
0867: private int _processedMisfiredTriggerCount;
0868:
0869: public RecoverMisfiredJobsResult(
0870: boolean hasMoreMisfiredTriggers,
0871: int processedMisfiredTriggerCount) {
0872: _hasMoreMisfiredTriggers = hasMoreMisfiredTriggers;
0873: _processedMisfiredTriggerCount = processedMisfiredTriggerCount;
0874: }
0875:
0876: public boolean hasMoreMisfiredTriggers() {
0877: return _hasMoreMisfiredTriggers;
0878: }
0879:
0880: public int getProcessedMisfiredTriggerCount() {
0881: return _processedMisfiredTriggerCount;
0882: }
0883: }
0884:
0885: protected RecoverMisfiredJobsResult recoverMisfiredJobs(
0886: Connection conn, boolean recovering)
0887: throws JobPersistenceException, SQLException {
0888:
0889: // If recovering, we want to handle all of the misfired
0890: // triggers right away.
0891: int maxMisfiresToHandleAtATime = (recovering) ? -1
0892: : getMaxMisfiresToHandleAtATime();
0893:
0894: List misfiredTriggers = new ArrayList();
0895:
0896: // We must still look for the MISFIRED state in case triggers were left
0897: // in this state when upgrading to this version that does not support it.
0898: boolean hasMoreMisfiredTriggers = getDelegate()
0899: .selectMisfiredTriggersInStates(conn, STATE_MISFIRED,
0900: STATE_WAITING, getMisfireTime(),
0901: maxMisfiresToHandleAtATime, misfiredTriggers);
0902:
0903: if (hasMoreMisfiredTriggers) {
0904: getLog()
0905: .info(
0906: "Handling the first "
0907: + misfiredTriggers.size()
0908: + " triggers that missed their scheduled fire-time. "
0909: + "More misfired triggers remain to be processed.");
0910: } else if (misfiredTriggers.size() > 0) {
0911: getLog()
0912: .info(
0913: "Handling "
0914: + misfiredTriggers.size()
0915: + " trigger(s) that missed their scheduled fire-time.");
0916: } else {
0917: getLog()
0918: .debug(
0919: "Found 0 triggers that missed their scheduled fire-time.");
0920: return RecoverMisfiredJobsResult.NO_OP;
0921: }
0922:
0923: for (Iterator misfiredTriggerIter = misfiredTriggers.iterator(); misfiredTriggerIter
0924: .hasNext();) {
0925: Key triggerKey = (Key) misfiredTriggerIter.next();
0926:
0927: Trigger trig = retrieveTrigger(conn, triggerKey.getName(),
0928: triggerKey.getGroup());
0929:
0930: if (trig == null) {
0931: continue;
0932: }
0933:
0934: doUpdateOfMisfiredTrigger(conn, null, trig, false,
0935: STATE_WAITING, recovering);
0936: }
0937:
0938: return new RecoverMisfiredJobsResult(hasMoreMisfiredTriggers,
0939: misfiredTriggers.size());
0940: }
0941:
0942: protected boolean updateMisfiredTrigger(Connection conn,
0943: SchedulingContext ctxt, String triggerName,
0944: String groupName, String newStateIfNotComplete,
0945: boolean forceState) // TODO: probably
0946: // get rid of
0947: // this
0948: throws JobPersistenceException {
0949: try {
0950:
0951: Trigger trig = getDelegate().selectTrigger(conn,
0952: triggerName, groupName);
0953:
0954: long misfireTime = System.currentTimeMillis();
0955: if (getMisfireThreshold() > 0) {
0956: misfireTime -= getMisfireThreshold();
0957: }
0958:
0959: if (trig.getNextFireTime().getTime() > misfireTime) {
0960: return false;
0961: }
0962:
0963: doUpdateOfMisfiredTrigger(conn, ctxt, trig, forceState,
0964: newStateIfNotComplete, false);
0965:
0966: return true;
0967:
0968: } catch (Exception e) {
0969: throw new JobPersistenceException(
0970: "Couldn't update misfired trigger '" + groupName
0971: + "." + triggerName + "': "
0972: + e.getMessage(), e);
0973: }
0974: }
0975:
0976: private void doUpdateOfMisfiredTrigger(Connection conn,
0977: SchedulingContext ctxt, Trigger trig, boolean forceState,
0978: String newStateIfNotComplete, boolean recovering)
0979: throws JobPersistenceException {
0980: Calendar cal = null;
0981: if (trig.getCalendarName() != null) {
0982: cal = retrieveCalendar(conn, ctxt, trig.getCalendarName());
0983: }
0984:
0985: signaler.notifyTriggerListenersMisfired(trig);
0986:
0987: trig.updateAfterMisfire(cal);
0988:
0989: if (trig.getNextFireTime() == null) {
0990: storeTrigger(conn, ctxt, trig, null, true, STATE_COMPLETE,
0991: forceState, recovering);
0992: } else {
0993: storeTrigger(conn, ctxt, trig, null, true,
0994: newStateIfNotComplete, forceState, false);
0995: }
0996: }
0997:
0998: /**
0999: * <p>
1000: * Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>.
1001: * </p>
1002: *
1003: * @param newJob
1004: * The <code>JobDetail</code> to be stored.
1005: * @param newTrigger
1006: * The <code>Trigger</code> to be stored.
1007: * @throws ObjectAlreadyExistsException
1008: * if a <code>Job</code> with the same name/group already
1009: * exists.
1010: */
1011: public void storeJobAndTrigger(final SchedulingContext ctxt,
1012: final JobDetail newJob, final Trigger newTrigger)
1013: throws ObjectAlreadyExistsException,
1014: JobPersistenceException {
1015: executeInLock((isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
1016: new VoidTransactionCallback() {
1017: public void execute(Connection conn)
1018: throws JobPersistenceException {
1019: if (newJob.isVolatile()
1020: && !newTrigger.isVolatile()) {
1021: JobPersistenceException jpe = new JobPersistenceException(
1022: "Cannot associate non-volatile trigger with a volatile job!");
1023: jpe
1024: .setErrorCode(SchedulerException.ERR_CLIENT_ERROR);
1025: throw jpe;
1026: }
1027:
1028: storeJob(conn, ctxt, newJob, false);
1029: storeTrigger(conn, ctxt, newTrigger, newJob,
1030: false, Constants.STATE_WAITING, false,
1031: false);
1032: }
1033: });
1034: }
1035:
1036: /**
1037: * <p>
1038: * Store the given <code>{@link org.quartz.JobDetail}</code>.
1039: * </p>
1040: *
1041: * @param newJob
1042: * The <code>JobDetail</code> to be stored.
1043: * @param replaceExisting
1044: * If <code>true</code>, any <code>Job</code> existing in the
1045: * <code>JobStore</code> with the same name & group should be
1046: * over-written.
1047: * @throws ObjectAlreadyExistsException
1048: * if a <code>Job</code> with the same name/group already
1049: * exists, and replaceExisting is set to false.
1050: */
1051: public void storeJob(final SchedulingContext ctxt,
1052: final JobDetail newJob, final boolean replaceExisting)
1053: throws ObjectAlreadyExistsException,
1054: JobPersistenceException {
1055: executeInLock(
1056: (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS
1057: : null, new VoidTransactionCallback() {
1058: public void execute(Connection conn)
1059: throws JobPersistenceException {
1060: storeJob(conn, ctxt, newJob, replaceExisting);
1061: }
1062: });
1063: }
1064:
1065: /**
1066: * <p>
1067: * Insert or update a job.
1068: * </p>
1069: */
1070: protected void storeJob(Connection conn, SchedulingContext ctxt,
1071: JobDetail newJob, boolean replaceExisting)
1072: throws ObjectAlreadyExistsException,
1073: JobPersistenceException {
1074: if (newJob.isVolatile() && isClustered()) {
1075: getLog()
1076: .info(
1077: "note: volatile jobs are effectively non-volatile in a clustered environment.");
1078: }
1079:
1080: boolean existingJob = jobExists(conn, newJob.getName(), newJob
1081: .getGroup());
1082: try {
1083: if (existingJob) {
1084: if (!replaceExisting) {
1085: throw new ObjectAlreadyExistsException(newJob);
1086: }
1087: getDelegate().updateJobDetail(conn, newJob);
1088: } else {
1089: getDelegate().insertJobDetail(conn, newJob);
1090: }
1091: } catch (IOException e) {
1092: throw new JobPersistenceException("Couldn't store job: "
1093: + e.getMessage(), e);
1094: } catch (SQLException e) {
1095: throw new JobPersistenceException("Couldn't store job: "
1096: + e.getMessage(), e);
1097: }
1098: }
1099:
1100: /**
1101: * <p>
1102: * Check existence of a given job.
1103: * </p>
1104: */
1105: protected boolean jobExists(Connection conn, String jobName,
1106: String groupName) throws JobPersistenceException {
1107: try {
1108: return getDelegate().jobExists(conn, jobName, groupName);
1109: } catch (SQLException e) {
1110: throw new JobPersistenceException(
1111: "Couldn't determine job existence (" + groupName
1112: + "." + jobName + "): " + e.getMessage(), e);
1113: }
1114: }
1115:
1116: /**
1117: * <p>
1118: * Store the given <code>{@link org.quartz.Trigger}</code>.
1119: * </p>
1120: *
1121: * @param newTrigger
1122: * The <code>Trigger</code> to be stored.
1123: * @param replaceExisting
1124: * If <code>true</code>, any <code>Trigger</code> existing in
1125: * the <code>JobStore</code> with the same name & group should
1126: * be over-written.
1127: * @throws ObjectAlreadyExistsException
1128: * if a <code>Trigger</code> with the same name/group already
1129: * exists, and replaceExisting is set to false.
1130: */
1131: public void storeTrigger(final SchedulingContext ctxt,
1132: final Trigger newTrigger, final boolean replaceExisting)
1133: throws ObjectAlreadyExistsException,
1134: JobPersistenceException {
1135: executeInLock(
1136: (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS
1137: : null, new VoidTransactionCallback() {
1138: public void execute(Connection conn)
1139: throws JobPersistenceException {
1140: storeTrigger(conn, ctxt, newTrigger, null,
1141: replaceExisting, STATE_WAITING, false,
1142: false);
1143: }
1144: });
1145: }
1146:
1147: /**
1148: * <p>
1149: * Insert or update a trigger.
1150: * </p>
1151: */
1152: protected void storeTrigger(Connection conn,
1153: SchedulingContext ctxt, Trigger newTrigger, JobDetail job,
1154: boolean replaceExisting, String state, boolean forceState,
1155: boolean recovering) throws ObjectAlreadyExistsException,
1156: JobPersistenceException {
1157: if (newTrigger.isVolatile() && isClustered()) {
1158: getLog()
1159: .info(
1160: "note: volatile triggers are effectively non-volatile in a clustered environment.");
1161: }
1162:
1163: boolean existingTrigger = triggerExists(conn, newTrigger
1164: .getName(), newTrigger.getGroup());
1165:
1166: if ((existingTrigger) && (!replaceExisting)) {
1167: throw new ObjectAlreadyExistsException(newTrigger);
1168: }
1169:
1170: try {
1171:
1172: boolean shouldBepaused = false;
1173:
1174: if (!forceState) {
1175: shouldBepaused = getDelegate().isTriggerGroupPaused(
1176: conn, newTrigger.getGroup());
1177:
1178: if (!shouldBepaused) {
1179: shouldBepaused = getDelegate()
1180: .isTriggerGroupPaused(conn,
1181: ALL_GROUPS_PAUSED);
1182:
1183: if (shouldBepaused) {
1184: getDelegate().insertPausedTriggerGroup(conn,
1185: newTrigger.getGroup());
1186: }
1187: }
1188:
1189: if (shouldBepaused
1190: && (state.equals(STATE_WAITING) || state
1191: .equals(STATE_ACQUIRED))) {
1192: state = STATE_PAUSED;
1193: }
1194: }
1195:
1196: if (job == null) {
1197: job = getDelegate().selectJobDetail(conn,
1198: newTrigger.getJobName(),
1199: newTrigger.getJobGroup(), getClassLoadHelper());
1200: }
1201: if (job == null) {
1202: throw new JobPersistenceException("The job ("
1203: + newTrigger.getFullJobName()
1204: + ") referenced by the trigger does not exist.");
1205: }
1206: if (job.isVolatile() && !newTrigger.isVolatile()) {
1207: throw new JobPersistenceException(
1208: "It does not make sense to "
1209: + "associate a non-volatile Trigger with a volatile Job!");
1210: }
1211:
1212: if (job.isStateful() && !recovering) {
1213: state = checkBlockedState(conn, ctxt, job.getName(),
1214: job.getGroup(), state);
1215: }
1216:
1217: if (existingTrigger) {
1218: if (newTrigger.getClass() == SimpleTrigger.class) {
1219: getDelegate().updateSimpleTrigger(conn,
1220: (SimpleTrigger) newTrigger);
1221: } else if (newTrigger.getClass() == CronTrigger.class) {
1222: getDelegate().updateCronTrigger(conn,
1223: (CronTrigger) newTrigger);
1224: } else {
1225: getDelegate().updateBlobTrigger(conn, newTrigger);
1226: }
1227: getDelegate().updateTrigger(conn, newTrigger, state,
1228: job);
1229: } else {
1230: getDelegate().insertTrigger(conn, newTrigger, state,
1231: job);
1232: if (newTrigger.getClass() == SimpleTrigger.class) {
1233: getDelegate().insertSimpleTrigger(conn,
1234: (SimpleTrigger) newTrigger);
1235: } else if (newTrigger.getClass() == CronTrigger.class) {
1236: getDelegate().insertCronTrigger(conn,
1237: (CronTrigger) newTrigger);
1238: } else {
1239: getDelegate().insertBlobTrigger(conn, newTrigger);
1240: }
1241: }
1242: } catch (Exception e) {
1243: throw new JobPersistenceException(
1244: "Couldn't store trigger: " + e.getMessage(), e);
1245: }
1246: }
1247:
1248: /**
1249: * <p>
1250: * Check existence of a given trigger.
1251: * </p>
1252: */
1253: protected boolean triggerExists(Connection conn,
1254: String triggerName, String groupName)
1255: throws JobPersistenceException {
1256: try {
1257: return getDelegate().triggerExists(conn, triggerName,
1258: groupName);
1259: } catch (SQLException e) {
1260: throw new JobPersistenceException(
1261: "Couldn't determine trigger existence ("
1262: + groupName + "." + triggerName + "): "
1263: + e.getMessage(), e);
1264: }
1265: }
1266:
1267: /**
1268: * <p>
1269: * Remove (delete) the <code>{@link org.quartz.Job}</code> with the given
1270: * name, and any <code>{@link org.quartz.Trigger}</code> s that reference
1271: * it.
1272: * </p>
1273: *
1274: * <p>
1275: * If removal of the <code>Job</code> results in an empty group, the
1276: * group should be removed from the <code>JobStore</code>'s list of
1277: * known group names.
1278: * </p>
1279: *
1280: * @param jobName
1281: * The name of the <code>Job</code> to be removed.
1282: * @param groupName
1283: * The group name of the <code>Job</code> to be removed.
1284: * @return <code>true</code> if a <code>Job</code> with the given name &
1285: * group was found and removed from the store.
1286: */
1287: public boolean removeJob(final SchedulingContext ctxt,
1288: final String jobName, final String groupName)
1289: throws JobPersistenceException {
1290: return ((Boolean) executeInLock(LOCK_TRIGGER_ACCESS,
1291: new TransactionCallback() {
1292: public Object execute(Connection conn)
1293: throws JobPersistenceException {
1294: return removeJob(conn, ctxt, jobName,
1295: groupName, true) ? Boolean.TRUE
1296: : Boolean.FALSE;
1297: }
1298: })).booleanValue();
1299: }
1300:
1301: protected boolean removeJob(Connection conn,
1302: SchedulingContext ctxt, String jobName, String groupName,
1303: boolean activeDeleteSafe) throws JobPersistenceException {
1304:
1305: try {
1306: Key[] jobTriggers = getDelegate().selectTriggerNamesForJob(
1307: conn, jobName, groupName);
1308: for (int i = 0; i < jobTriggers.length; ++i) {
1309: deleteTriggerAndChildren(conn,
1310: jobTriggers[i].getName(), jobTriggers[i]
1311: .getGroup());
1312: }
1313:
1314: return deleteJobAndChildren(conn, ctxt, jobName, groupName);
1315: } catch (SQLException e) {
1316: throw new JobPersistenceException("Couldn't remove job: "
1317: + e.getMessage(), e);
1318: }
1319: }
1320:
1321: /**
1322: * Delete a job and its listeners.
1323: *
1324: * @see #removeJob(Connection, SchedulingContext, String, String, boolean)
1325: * @see #removeTrigger(Connection, SchedulingContext, String, String)
1326: */
1327: private boolean deleteJobAndChildren(Connection conn,
1328: SchedulingContext ctxt, String jobName, String groupName)
1329: throws NoSuchDelegateException, SQLException {
1330: getDelegate().deleteJobListeners(conn, jobName, groupName);
1331:
1332: return (getDelegate().deleteJobDetail(conn, jobName, groupName) > 0);
1333: }
1334:
1335: /**
1336: * Delete a trigger, its listeners, and its Simple/Cron/BLOB sub-table entry.
1337: *
1338: * @see #removeJob(Connection, SchedulingContext, String, String, boolean)
1339: * @see #removeTrigger(Connection, SchedulingContext, String, String)
1340: * @see #replaceTrigger(Connection, SchedulingContext, String, String, Trigger)
1341: */
1342: private boolean deleteTriggerAndChildren(Connection conn,
1343: String triggerName, String triggerGroupName)
1344: throws SQLException, NoSuchDelegateException {
1345: DriverDelegate delegate = getDelegate();
1346:
1347: // Once it succeeds in deleting one sub-table entry it will not try the others.
1348: if ((delegate.deleteSimpleTrigger(conn, triggerName,
1349: triggerGroupName) == 0)
1350: && (delegate.deleteCronTrigger(conn, triggerName,
1351: triggerGroupName) == 0)) {
1352: delegate.deleteBlobTrigger(conn, triggerName,
1353: triggerGroupName);
1354: }
1355:
1356: delegate.deleteTriggerListeners(conn, triggerName,
1357: triggerGroupName);
1358:
1359: return (delegate.deleteTrigger(conn, triggerName,
1360: triggerGroupName) > 0);
1361: }
1362:
1363: /**
1364: * <p>
1365: * Retrieve the <code>{@link org.quartz.JobDetail}</code> for the given
1366: * <code>{@link org.quartz.Job}</code>.
1367: * </p>
1368: *
1369: * @param jobName
1370: * The name of the <code>Job</code> to be retrieved.
1371: * @param groupName
1372: * The group name of the <code>Job</code> to be retrieved.
1373: * @return The desired <code>Job</code>, or null if there is no match.
1374: */
1375: public JobDetail retrieveJob(final SchedulingContext ctxt,
1376: final String jobName, final String groupName)
1377: throws JobPersistenceException {
1378: return (JobDetail) executeWithoutLock( // no locks necessary for read...
1379: new TransactionCallback() {
1380: public Object execute(Connection conn)
1381: throws JobPersistenceException {
1382: return retrieveJob(conn, ctxt, jobName, groupName);
1383: }
1384: });
1385: }
1386:
1387: protected JobDetail retrieveJob(Connection conn,
1388: SchedulingContext ctxt, String jobName, String groupName)
1389: throws JobPersistenceException {
1390: try {
1391: JobDetail job = getDelegate().selectJobDetail(conn,
1392: jobName, groupName, getClassLoadHelper());
1393: if (job != null) {
1394: String[] listeners = getDelegate().selectJobListeners(
1395: conn, jobName, groupName);
1396: for (int i = 0; i < listeners.length; ++i) {
1397: job.addJobListener(listeners[i]);
1398: }
1399: }
1400:
1401: return job;
1402: } catch (ClassNotFoundException e) {
1403: throw new JobPersistenceException(
1404: "Couldn't retrieve job because a required class was not found: "
1405: + e.getMessage(),
1406: e,
1407: SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST);
1408: } catch (IOException e) {
1409: throw new JobPersistenceException(
1410: "Couldn't retrieve job because the BLOB couldn't be deserialized: "
1411: + e.getMessage(),
1412: e,
1413: SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST);
1414: } catch (SQLException e) {
1415: throw new JobPersistenceException("Couldn't retrieve job: "
1416: + e.getMessage(), e);
1417: }
1418: }
1419:
1420: /**
1421: * <p>
1422: * Remove (delete) the <code>{@link org.quartz.Trigger}</code> with the
1423: * given name.
1424: * </p>
1425: *
1426: * <p>
1427: * If removal of the <code>Trigger</code> results in an empty group, the
1428: * group should be removed from the <code>JobStore</code>'s list of
1429: * known group names.
1430: * </p>
1431: *
1432: * <p>
1433: * If removal of the <code>Trigger</code> results in an 'orphaned' <code>Job</code>
1434: * that is not 'durable', then the <code>Job</code> should be deleted
1435: * also.
1436: * </p>
1437: *
1438: * @param triggerName
1439: * The name of the <code>Trigger</code> to be removed.
1440: * @param groupName
1441: * The group name of the <code>Trigger</code> to be removed.
1442: * @return <code>true</code> if a <code>Trigger</code> with the given
1443: * name & group was found and removed from the store.
1444: */
1445: public boolean removeTrigger(final SchedulingContext ctxt,
1446: final String triggerName, final String groupName)
1447: throws JobPersistenceException {
1448: return ((Boolean) executeInLock(LOCK_TRIGGER_ACCESS,
1449: new TransactionCallback() {
1450: public Object execute(Connection conn)
1451: throws JobPersistenceException {
1452: return removeTrigger(conn, ctxt, triggerName,
1453: groupName) ? Boolean.TRUE
1454: : Boolean.FALSE;
1455: }
1456: })).booleanValue();
1457: }
1458:
1459: protected boolean removeTrigger(Connection conn,
1460: SchedulingContext ctxt, String triggerName, String groupName)
1461: throws JobPersistenceException {
1462: boolean removedTrigger = false;
1463: try {
1464: // this must be called before we delete the trigger, obviously
1465: JobDetail job = getDelegate().selectJobForTrigger(conn,
1466: triggerName, groupName, getClassLoadHelper());
1467:
1468: removedTrigger = deleteTriggerAndChildren(conn,
1469: triggerName, groupName);
1470:
1471: if (null != job && !job.isDurable()) {
1472: int numTriggers = getDelegate()
1473: .selectNumTriggersForJob(conn, job.getName(),
1474: job.getGroup());
1475: if (numTriggers == 0) {
1476: // Don't call removeJob() because we don't want to check for
1477: // triggers again.
1478: deleteJobAndChildren(conn, ctxt, job.getName(), job
1479: .getGroup());
1480: }
1481: }
1482: } catch (ClassNotFoundException e) {
1483: throw new JobPersistenceException(
1484: "Couldn't remove trigger: " + e.getMessage(), e);
1485: } catch (SQLException e) {
1486: throw new JobPersistenceException(
1487: "Couldn't remove trigger: " + e.getMessage(), e);
1488: }
1489:
1490: return removedTrigger;
1491: }
1492:
1493: /**
1494: * @see org.quartz.spi.JobStore#replaceTrigger(org.quartz.core.SchedulingContext, java.lang.String, java.lang.String, org.quartz.Trigger)
1495: */
1496: public boolean replaceTrigger(final SchedulingContext ctxt,
1497: final String triggerName, final String groupName,
1498: final Trigger newTrigger) throws JobPersistenceException {
1499: return ((Boolean) executeInLock(LOCK_TRIGGER_ACCESS,
1500: new TransactionCallback() {
1501: public Object execute(Connection conn)
1502: throws JobPersistenceException {
1503: return replaceTrigger(conn, ctxt, triggerName,
1504: groupName, newTrigger) ? Boolean.TRUE
1505: : Boolean.FALSE;
1506: }
1507: })).booleanValue();
1508: }
1509:
1510: protected boolean replaceTrigger(Connection conn,
1511: SchedulingContext ctxt, String triggerName,
1512: String groupName, Trigger newTrigger)
1513: throws JobPersistenceException {
1514: try {
1515: // this must be called before we delete the trigger, obviously
1516: JobDetail job = getDelegate().selectJobForTrigger(conn,
1517: triggerName, groupName, getClassLoadHelper());
1518:
1519: if (job == null) {
1520: return false;
1521: }
1522:
1523: if (!newTrigger.getJobName().equals(job.getName())
1524: || !newTrigger.getJobGroup().equals(job.getGroup())) {
1525: throw new JobPersistenceException(
1526: "New trigger is not related to the same job as the old trigger.");
1527: }
1528:
1529: boolean removedTrigger = deleteTriggerAndChildren(conn,
1530: triggerName, groupName);
1531:
1532: storeTrigger(conn, ctxt, newTrigger, job, false,
1533: STATE_WAITING, false, false);
1534:
1535: return removedTrigger;
1536: } catch (ClassNotFoundException e) {
1537: throw new JobPersistenceException(
1538: "Couldn't remove trigger: " + e.getMessage(), e);
1539: } catch (SQLException e) {
1540: throw new JobPersistenceException(
1541: "Couldn't remove trigger: " + e.getMessage(), e);
1542: }
1543: }
1544:
1545: /**
1546: * <p>
1547: * Retrieve the given <code>{@link org.quartz.Trigger}</code>.
1548: * </p>
1549: *
1550: * @param triggerName
1551: * The name of the <code>Trigger</code> to be retrieved.
1552: * @param groupName
1553: * The group name of the <code>Trigger</code> to be retrieved.
1554: * @return The desired <code>Trigger</code>, or null if there is no
1555: * match.
1556: */
1557: public Trigger retrieveTrigger(final SchedulingContext ctxt,
1558: final String triggerName, final String groupName)
1559: throws JobPersistenceException {
1560: return (Trigger) executeWithoutLock( // no locks necessary for read...
1561: new TransactionCallback() {
1562: public Object execute(Connection conn)
1563: throws JobPersistenceException {
1564: return retrieveTrigger(conn, ctxt, triggerName,
1565: groupName);
1566: }
1567: });
1568: }
1569:
1570: protected Trigger retrieveTrigger(Connection conn,
1571: SchedulingContext ctxt, String triggerName, String groupName)
1572: throws JobPersistenceException {
1573: return retrieveTrigger(conn, triggerName, groupName);
1574: }
1575:
1576: protected Trigger retrieveTrigger(Connection conn,
1577: String triggerName, String groupName)
1578: throws JobPersistenceException {
1579: try {
1580: Trigger trigger = getDelegate().selectTrigger(conn,
1581: triggerName, groupName);
1582: if (trigger == null) {
1583: return null;
1584: }
1585:
1586: // In case Trigger was BLOB, clear out any listeners that might
1587: // have been serialized.
1588: trigger.clearAllTriggerListeners();
1589:
1590: String[] listeners = getDelegate().selectTriggerListeners(
1591: conn, triggerName, groupName);
1592: for (int i = 0; i < listeners.length; ++i) {
1593: trigger.addTriggerListener(listeners[i]);
1594: }
1595:
1596: return trigger;
1597: } catch (Exception e) {
1598: throw new JobPersistenceException(
1599: "Couldn't retrieve trigger: " + e.getMessage(), e);
1600: }
1601: }
1602:
1603: /**
1604: * <p>
1605: * Get the current state of the identified <code>{@link Trigger}</code>.
1606: * </p>
1607: *
1608: * @see Trigger#STATE_NORMAL
1609: * @see Trigger#STATE_PAUSED
1610: * @see Trigger#STATE_COMPLETE
1611: * @see Trigger#STATE_ERROR
1612: * @see Trigger#STATE_NONE
1613: */
1614: public int getTriggerState(final SchedulingContext ctxt,
1615: final String triggerName, final String groupName)
1616: throws JobPersistenceException {
1617: return ((Integer) executeWithoutLock( // no locks necessary for read...
1618: new TransactionCallback() {
1619: public Object execute(Connection conn)
1620: throws JobPersistenceException {
1621: return new Integer(getTriggerState(conn, ctxt,
1622: triggerName, groupName));
1623: }
1624: })).intValue();
1625: }
1626:
1627: public int getTriggerState(Connection conn, SchedulingContext ctxt,
1628: String triggerName, String groupName)
1629: throws JobPersistenceException {
1630: try {
1631: String ts = getDelegate().selectTriggerState(conn,
1632: triggerName, groupName);
1633:
1634: if (ts == null) {
1635: return Trigger.STATE_NONE;
1636: }
1637:
1638: if (ts.equals(STATE_DELETED)) {
1639: return Trigger.STATE_NONE;
1640: }
1641:
1642: if (ts.equals(STATE_COMPLETE)) {
1643: return Trigger.STATE_COMPLETE;
1644: }
1645:
1646: if (ts.equals(STATE_PAUSED)) {
1647: return Trigger.STATE_PAUSED;
1648: }
1649:
1650: if (ts.equals(STATE_PAUSED_BLOCKED)) {
1651: return Trigger.STATE_PAUSED;
1652: }
1653:
1654: if (ts.equals(STATE_ERROR)) {
1655: return Trigger.STATE_ERROR;
1656: }
1657:
1658: if (ts.equals(STATE_BLOCKED)) {
1659: return Trigger.STATE_BLOCKED;
1660: }
1661:
1662: return Trigger.STATE_NORMAL;
1663:
1664: } catch (SQLException e) {
1665: throw new JobPersistenceException(
1666: "Couldn't determine state of trigger (" + groupName
1667: + "." + triggerName + "): "
1668: + e.getMessage(), e);
1669: }
1670: }
1671:
1672: /**
1673: * <p>
1674: * Store the given <code>{@link org.quartz.Calendar}</code>.
1675: * </p>
1676: *
1677: * @param calName
1678: * The name of the calendar.
1679: * @param calendar
1680: * The <code>Calendar</code> to be stored.
1681: * @param replaceExisting
1682: * If <code>true</code>, any <code>Calendar</code> existing
1683: * in the <code>JobStore</code> with the same name & group
1684: * should be over-written.
1685: * @throws ObjectAlreadyExistsException
1686: * if a <code>Calendar</code> with the same name already
1687: * exists, and replaceExisting is set to false.
1688: */
1689: public void storeCalendar(final SchedulingContext ctxt,
1690: final String calName, final Calendar calendar,
1691: final boolean replaceExisting, final boolean updateTriggers)
1692: throws ObjectAlreadyExistsException,
1693: JobPersistenceException {
1694: executeInLock(
1695: (isLockOnInsert() || updateTriggers) ? LOCK_TRIGGER_ACCESS
1696: : null, new VoidTransactionCallback() {
1697: public void execute(Connection conn)
1698: throws JobPersistenceException {
1699: storeCalendar(conn, ctxt, calName, calendar,
1700: replaceExisting, updateTriggers);
1701: }
1702: });
1703: }
1704:
1705: protected void storeCalendar(Connection conn,
1706: SchedulingContext ctxt, String calName, Calendar calendar,
1707: boolean replaceExisting, boolean updateTriggers)
1708: throws ObjectAlreadyExistsException,
1709: JobPersistenceException {
1710: try {
1711: boolean existingCal = calendarExists(conn, calName);
1712: if (existingCal && !replaceExisting) {
1713: throw new ObjectAlreadyExistsException(
1714: "Calendar with name '" + calName
1715: + "' already exists.");
1716: }
1717:
1718: if (existingCal) {
1719: if (getDelegate().updateCalendar(conn, calName,
1720: calendar) < 1) {
1721: throw new JobPersistenceException(
1722: "Couldn't store calendar. Update failed.");
1723: }
1724:
1725: if (updateTriggers) {
1726: Trigger[] trigs = getDelegate()
1727: .selectTriggersForCalendar(conn, calName);
1728:
1729: for (int i = 0; i < trigs.length; i++) {
1730: trigs[i].updateWithNewCalendar(calendar,
1731: getMisfireThreshold());
1732: storeTrigger(conn, ctxt, trigs[i], null, true,
1733: STATE_WAITING, false, false);
1734: }
1735: }
1736: } else {
1737: if (getDelegate().insertCalendar(conn, calName,
1738: calendar) < 1) {
1739: throw new JobPersistenceException(
1740: "Couldn't store calendar. Insert failed.");
1741: }
1742: }
1743:
1744: if (isClustered == false) {
1745: calendarCache.put(calName, calendar); // lazy-cache
1746: }
1747:
1748: } catch (IOException e) {
1749: throw new JobPersistenceException(
1750: "Couldn't store calendar because the BLOB couldn't be serialized: "
1751: + e.getMessage(), e);
1752: } catch (ClassNotFoundException e) {
1753: throw new JobPersistenceException(
1754: "Couldn't store calendar: " + e.getMessage(), e);
1755: } catch (SQLException e) {
1756: throw new JobPersistenceException(
1757: "Couldn't store calendar: " + e.getMessage(), e);
1758: }
1759: }
1760:
1761: protected boolean calendarExists(Connection conn, String calName)
1762: throws JobPersistenceException {
1763: try {
1764: return getDelegate().calendarExists(conn, calName);
1765: } catch (SQLException e) {
1766: throw new JobPersistenceException(
1767: "Couldn't determine calendar existence (" + calName
1768: + "): " + e.getMessage(), e);
1769: }
1770: }
1771:
1772: /**
1773: * <p>
1774: * Remove (delete) the <code>{@link org.quartz.Calendar}</code> with the
1775: * given name.
1776: * </p>
1777: *
1778: * <p>
1779: * If removal of the <code>Calendar</code> would result in
1780: * <code.Trigger</code>s pointing to non-existent calendars, then a
1781: * <code>JobPersistenceException</code> will be thrown.</p>
1782: * *
1783: * @param calName The name of the <code>Calendar</code> to be removed.
1784: * @return <code>true</code> if a <code>Calendar</code> with the given name
1785: * was found and removed from the store.
1786: */
1787: public boolean removeCalendar(final SchedulingContext ctxt,
1788: final String calName) throws JobPersistenceException {
1789: return ((Boolean) executeInLock(LOCK_TRIGGER_ACCESS,
1790: new TransactionCallback() {
1791: public Object execute(Connection conn)
1792: throws JobPersistenceException {
1793: return removeCalendar(conn, ctxt, calName) ? Boolean.TRUE
1794: : Boolean.FALSE;
1795: }
1796: })).booleanValue();
1797: }
1798:
1799: protected boolean removeCalendar(Connection conn,
1800: SchedulingContext ctxt, String calName)
1801: throws JobPersistenceException {
1802: try {
1803: if (getDelegate().calendarIsReferenced(conn, calName)) {
1804: throw new JobPersistenceException(
1805: "Calender cannot be removed if it referenced by a trigger!");
1806: }
1807:
1808: if (isClustered == false) {
1809: calendarCache.remove(calName);
1810: }
1811:
1812: return (getDelegate().deleteCalendar(conn, calName) > 0);
1813: } catch (SQLException e) {
1814: throw new JobPersistenceException(
1815: "Couldn't remove calendar: " + e.getMessage(), e);
1816: }
1817: }
1818:
1819: /**
1820: * <p>
1821: * Retrieve the given <code>{@link org.quartz.Trigger}</code>.
1822: * </p>
1823: *
1824: * @param calName
1825: * The name of the <code>Calendar</code> to be retrieved.
1826: * @return The desired <code>Calendar</code>, or null if there is no
1827: * match.
1828: */
1829: public Calendar retrieveCalendar(final SchedulingContext ctxt,
1830: final String calName) throws JobPersistenceException {
1831: return (Calendar) executeWithoutLock( // no locks necessary for read...
1832: new TransactionCallback() {
1833: public Object execute(Connection conn)
1834: throws JobPersistenceException {
1835: return retrieveCalendar(conn, ctxt, calName);
1836: }
1837: });
1838: }
1839:
1840: protected Calendar retrieveCalendar(Connection conn,
1841: SchedulingContext ctxt, String calName)
1842: throws JobPersistenceException {
1843: // all calendars are persistent, but we can lazy-cache them during run
1844: // time as long as we aren't running clustered.
1845: Calendar cal = (isClustered) ? null : (Calendar) calendarCache
1846: .get(calName);
1847: if (cal != null) {
1848: return cal;
1849: }
1850:
1851: try {
1852: cal = getDelegate().selectCalendar(conn, calName);
1853: if (isClustered == false) {
1854: calendarCache.put(calName, cal); // lazy-cache...
1855: }
1856: return cal;
1857: } catch (ClassNotFoundException e) {
1858: throw new JobPersistenceException(
1859: "Couldn't retrieve calendar because a required class was not found: "
1860: + e.getMessage(), e);
1861: } catch (IOException e) {
1862: throw new JobPersistenceException(
1863: "Couldn't retrieve calendar because the BLOB couldn't be deserialized: "
1864: + e.getMessage(), e);
1865: } catch (SQLException e) {
1866: throw new JobPersistenceException(
1867: "Couldn't retrieve calendar: " + e.getMessage(), e);
1868: }
1869: }
1870:
1871: /**
1872: * <p>
1873: * Get the number of <code>{@link org.quartz.Job}</code> s that are
1874: * stored in the <code>JobStore</code>.
1875: * </p>
1876: */
1877: public int getNumberOfJobs(final SchedulingContext ctxt)
1878: throws JobPersistenceException {
1879: return ((Integer) executeWithoutLock( // no locks necessary for read...
1880: new TransactionCallback() {
1881: public Object execute(Connection conn)
1882: throws JobPersistenceException {
1883: return new Integer(getNumberOfJobs(conn, ctxt));
1884: }
1885: })).intValue();
1886: }
1887:
1888: protected int getNumberOfJobs(Connection conn,
1889: SchedulingContext ctxt) throws JobPersistenceException {
1890: try {
1891: return getDelegate().selectNumJobs(conn);
1892: } catch (SQLException e) {
1893: throw new JobPersistenceException(
1894: "Couldn't obtain number of jobs: " + e.getMessage(),
1895: e);
1896: }
1897: }
1898:
1899: /**
1900: * <p>
1901: * Get the number of <code>{@link org.quartz.Trigger}</code> s that are
1902: * stored in the <code>JobsStore</code>.
1903: * </p>
1904: */
1905: public int getNumberOfTriggers(final SchedulingContext ctxt)
1906: throws JobPersistenceException {
1907: return ((Integer) executeWithoutLock( // no locks necessary for read...
1908: new TransactionCallback() {
1909: public Object execute(Connection conn)
1910: throws JobPersistenceException {
1911: return new Integer(getNumberOfTriggers(conn, ctxt));
1912: }
1913: })).intValue();
1914: }
1915:
1916: protected int getNumberOfTriggers(Connection conn,
1917: SchedulingContext ctxt) throws JobPersistenceException {
1918: try {
1919: return getDelegate().selectNumTriggers(conn);
1920: } catch (SQLException e) {
1921: throw new JobPersistenceException(
1922: "Couldn't obtain number of triggers: "
1923: + e.getMessage(), e);
1924: }
1925: }
1926:
1927: /**
1928: * <p>
1929: * Get the number of <code>{@link org.quartz.Calendar}</code> s that are
1930: * stored in the <code>JobsStore</code>.
1931: * </p>
1932: */
1933: public int getNumberOfCalendars(final SchedulingContext ctxt)
1934: throws JobPersistenceException {
1935: return ((Integer) executeWithoutLock( // no locks necessary for read...
1936: new TransactionCallback() {
1937: public Object execute(Connection conn)
1938: throws JobPersistenceException {
1939: return new Integer(getNumberOfCalendars(conn, ctxt));
1940: }
1941: })).intValue();
1942: }
1943:
1944: protected int getNumberOfCalendars(Connection conn,
1945: SchedulingContext ctxt) throws JobPersistenceException {
1946: try {
1947: return getDelegate().selectNumCalendars(conn);
1948: } catch (SQLException e) {
1949: throw new JobPersistenceException(
1950: "Couldn't obtain number of calendars: "
1951: + e.getMessage(), e);
1952: }
1953: }
1954:
1955: /**
1956: * <p>
1957: * Get the names of all of the <code>{@link org.quartz.Job}</code> s that
1958: * have the given group name.
1959: * </p>
1960: *
1961: * <p>
1962: * If there are no jobs in the given group name, the result should be a
1963: * zero-length array (not <code>null</code>).
1964: * </p>
1965: */
1966: public String[] getJobNames(final SchedulingContext ctxt,
1967: final String groupName) throws JobPersistenceException {
1968: return (String[]) executeWithoutLock( // no locks necessary for read...
1969: new TransactionCallback() {
1970: public Object execute(Connection conn)
1971: throws JobPersistenceException {
1972: return getJobNames(conn, ctxt, groupName);
1973: }
1974: });
1975: }
1976:
1977: protected String[] getJobNames(Connection conn,
1978: SchedulingContext ctxt, String groupName)
1979: throws JobPersistenceException {
1980: String[] jobNames = null;
1981:
1982: try {
1983: jobNames = getDelegate().selectJobsInGroup(conn, groupName);
1984: } catch (SQLException e) {
1985: throw new JobPersistenceException(
1986: "Couldn't obtain job names: " + e.getMessage(), e);
1987: }
1988:
1989: return jobNames;
1990: }
1991:
1992: /**
1993: * <p>
1994: * Get the names of all of the <code>{@link org.quartz.Trigger}</code> s
1995: * that have the given group name.
1996: * </p>
1997: *
1998: * <p>
1999: * If there are no triggers in the given group name, the result should be a
2000: * zero-length array (not <code>null</code>).
2001: * </p>
2002: */
2003: public String[] getTriggerNames(final SchedulingContext ctxt,
2004: final String groupName) throws JobPersistenceException {
2005: return (String[]) executeWithoutLock( // no locks necessary for read...
2006: new TransactionCallback() {
2007: public Object execute(Connection conn)
2008: throws JobPersistenceException {
2009: return getTriggerNames(conn, ctxt, groupName);
2010: }
2011: });
2012: }
2013:
2014: protected String[] getTriggerNames(Connection conn,
2015: SchedulingContext ctxt, String groupName)
2016: throws JobPersistenceException {
2017:
2018: String[] trigNames = null;
2019:
2020: try {
2021: trigNames = getDelegate().selectTriggersInGroup(conn,
2022: groupName);
2023: } catch (SQLException e) {
2024: throw new JobPersistenceException(
2025: "Couldn't obtain trigger names: " + e.getMessage(),
2026: e);
2027: }
2028:
2029: return trigNames;
2030: }
2031:
2032: /**
2033: * <p>
2034: * Get the names of all of the <code>{@link org.quartz.Job}</code>
2035: * groups.
2036: * </p>
2037: *
2038: * <p>
2039: * If there are no known group names, the result should be a zero-length
2040: * array (not <code>null</code>).
2041: * </p>
2042: */
2043: public String[] getJobGroupNames(final SchedulingContext ctxt)
2044: throws JobPersistenceException {
2045: return (String[]) executeWithoutLock( // no locks necessary for read...
2046: new TransactionCallback() {
2047: public Object execute(Connection conn)
2048: throws JobPersistenceException {
2049: return getJobGroupNames(conn, ctxt);
2050: }
2051: });
2052: }
2053:
2054: protected String[] getJobGroupNames(Connection conn,
2055: SchedulingContext ctxt) throws JobPersistenceException {
2056:
2057: String[] groupNames = null;
2058:
2059: try {
2060: groupNames = getDelegate().selectJobGroups(conn);
2061: } catch (SQLException e) {
2062: throw new JobPersistenceException(
2063: "Couldn't obtain job groups: " + e.getMessage(), e);
2064: }
2065:
2066: return groupNames;
2067: }
2068:
2069: /**
2070: * <p>
2071: * Get the names of all of the <code>{@link org.quartz.Trigger}</code>
2072: * groups.
2073: * </p>
2074: *
2075: * <p>
2076: * If there are no known group names, the result should be a zero-length
2077: * array (not <code>null</code>).
2078: * </p>
2079: */
2080: public String[] getTriggerGroupNames(final SchedulingContext ctxt)
2081: throws JobPersistenceException {
2082: return (String[]) executeWithoutLock( // no locks necessary for read...
2083: new TransactionCallback() {
2084: public Object execute(Connection conn)
2085: throws JobPersistenceException {
2086: return getTriggerGroupNames(conn, ctxt);
2087: }
2088: });
2089: }
2090:
2091: protected String[] getTriggerGroupNames(Connection conn,
2092: SchedulingContext ctxt) throws JobPersistenceException {
2093:
2094: String[] groupNames = null;
2095:
2096: try {
2097: groupNames = getDelegate().selectTriggerGroups(conn);
2098: } catch (SQLException e) {
2099: throw new JobPersistenceException(
2100: "Couldn't obtain trigger groups: " + e.getMessage(),
2101: e);
2102: }
2103:
2104: return groupNames;
2105: }
2106:
2107: /**
2108: * <p>
2109: * Get the names of all of the <code>{@link org.quartz.Calendar}</code> s
2110: * in the <code>JobStore</code>.
2111: * </p>
2112: *
2113: * <p>
2114: * If there are no Calendars in the given group name, the result should be
2115: * a zero-length array (not <code>null</code>).
2116: * </p>
2117: */
2118: public String[] getCalendarNames(final SchedulingContext ctxt)
2119: throws JobPersistenceException {
2120: return (String[]) executeWithoutLock( // no locks necessary for read...
2121: new TransactionCallback() {
2122: public Object execute(Connection conn)
2123: throws JobPersistenceException {
2124: return getCalendarNames(conn, ctxt);
2125: }
2126: });
2127: }
2128:
2129: protected String[] getCalendarNames(Connection conn,
2130: SchedulingContext ctxt) throws JobPersistenceException {
2131: try {
2132: return getDelegate().selectCalendars(conn);
2133: } catch (SQLException e) {
2134: throw new JobPersistenceException(
2135: "Couldn't obtain trigger groups: " + e.getMessage(),
2136: e);
2137: }
2138: }
2139:
2140: /**
2141: * <p>
2142: * Get all of the Triggers that are associated to the given Job.
2143: * </p>
2144: *
2145: * <p>
2146: * If there are no matches, a zero-length array should be returned.
2147: * </p>
2148: */
2149: public Trigger[] getTriggersForJob(final SchedulingContext ctxt,
2150: final String jobName, final String groupName)
2151: throws JobPersistenceException {
2152: return (Trigger[]) executeWithoutLock( // no locks necessary for read...
2153: new TransactionCallback() {
2154: public Object execute(Connection conn)
2155: throws JobPersistenceException {
2156: return getTriggersForJob(conn, ctxt, jobName, groupName);
2157: }
2158: });
2159: }
2160:
2161: protected Trigger[] getTriggersForJob(Connection conn,
2162: SchedulingContext ctxt, String jobName, String groupName)
2163: throws JobPersistenceException {
2164: Trigger[] array = null;
2165:
2166: try {
2167: array = getDelegate().selectTriggersForJob(conn, jobName,
2168: groupName);
2169: } catch (Exception e) {
2170: throw new JobPersistenceException(
2171: "Couldn't obtain triggers for job: "
2172: + e.getMessage(), e);
2173: }
2174:
2175: return array;
2176: }
2177:
2178: /**
2179: * <p>
2180: * Pause the <code>{@link org.quartz.Trigger}</code> with the given name.
2181: * </p>
2182: *
2183: * @see #resumeTrigger(SchedulingContext, String, String)
2184: */
2185: public void pauseTrigger(final SchedulingContext ctxt,
2186: final String triggerName, final String groupName)
2187: throws JobPersistenceException {
2188: executeInLock(LOCK_TRIGGER_ACCESS,
2189: new VoidTransactionCallback() {
2190: public void execute(Connection conn)
2191: throws JobPersistenceException {
2192: pauseTrigger(conn, ctxt, triggerName, groupName);
2193: }
2194: });
2195: }
2196:
2197: /**
2198: * <p>
2199: * Pause the <code>{@link org.quartz.Trigger}</code> with the given name.
2200: * </p>
2201: *
2202: * @see #resumeTrigger(Connection, SchedulingContext, String, String)
2203: */
2204: public void pauseTrigger(Connection conn, SchedulingContext ctxt,
2205: String triggerName, String groupName)
2206: throws JobPersistenceException {
2207:
2208: try {
2209: String oldState = getDelegate().selectTriggerState(conn,
2210: triggerName, groupName);
2211:
2212: if (oldState.equals(STATE_WAITING)
2213: || oldState.equals(STATE_ACQUIRED)) {
2214:
2215: getDelegate().updateTriggerState(conn, triggerName,
2216: groupName, STATE_PAUSED);
2217: } else if (oldState.equals(STATE_BLOCKED)) {
2218: getDelegate().updateTriggerState(conn, triggerName,
2219: groupName, STATE_PAUSED_BLOCKED);
2220: }
2221: } catch (SQLException e) {
2222: throw new JobPersistenceException(
2223: "Couldn't pause trigger '" + groupName + "."
2224: + triggerName + "': " + e.getMessage(), e);
2225: }
2226: }
2227:
2228: /**
2229: * <p>
2230: * Pause the <code>{@link org.quartz.Job}</code> with the given name - by
2231: * pausing all of its current <code>Trigger</code>s.
2232: * </p>
2233: *
2234: * @see #resumeJob(SchedulingContext, String, String)
2235: */
2236: public void pauseJob(final SchedulingContext ctxt,
2237: final String jobName, final String groupName)
2238: throws JobPersistenceException {
2239: executeInLock(LOCK_TRIGGER_ACCESS,
2240: new VoidTransactionCallback() {
2241: public void execute(Connection conn)
2242: throws JobPersistenceException {
2243: Trigger[] triggers = getTriggersForJob(conn,
2244: ctxt, jobName, groupName);
2245: for (int j = 0; j < triggers.length; j++) {
2246: pauseTrigger(conn, ctxt, triggers[j]
2247: .getName(), triggers[j].getGroup());
2248: }
2249: }
2250: });
2251: }
2252:
2253: /**
2254: * <p>
2255: * Pause all of the <code>{@link org.quartz.Job}s</code> in the given
2256: * group - by pausing all of their <code>Trigger</code>s.
2257: * </p>
2258: *
2259: * @see #resumeJobGroup(SchedulingContext, String)
2260: */
2261: public void pauseJobGroup(final SchedulingContext ctxt,
2262: final String groupName) throws JobPersistenceException {
2263: executeInLock(LOCK_TRIGGER_ACCESS,
2264: new VoidTransactionCallback() {
2265: public void execute(Connection conn)
2266: throws JobPersistenceException {
2267: String[] jobNames = getJobNames(conn, ctxt,
2268: groupName);
2269:
2270: for (int i = 0; i < jobNames.length; i++) {
2271: Trigger[] triggers = getTriggersForJob(
2272: conn, ctxt, jobNames[i], groupName);
2273: for (int j = 0; j < triggers.length; j++) {
2274: pauseTrigger(conn, ctxt, triggers[j]
2275: .getName(), triggers[j]
2276: .getGroup());
2277: }
2278: }
2279: }
2280: });
2281: }
2282:
2283: /**
2284: * Determines if a Trigger for the given job should be blocked.
2285: * State can only transition to STATE_PAUSED_BLOCKED/STATE_BLOCKED from
2286: * STATE_PAUSED/STATE_WAITING respectively.
2287: *
2288: * @return STATE_PAUSED_BLOCKED, STATE_BLOCKED, or the currentState.
2289: */
2290: protected String checkBlockedState(Connection conn,
2291: SchedulingContext ctxt, String jobName,
2292: String jobGroupName, String currentState)
2293: throws JobPersistenceException {
2294:
2295: // State can only transition to BLOCKED from PAUSED or WAITING.
2296: if ((currentState.equals(STATE_WAITING) == false)
2297: && (currentState.equals(STATE_PAUSED) == false)) {
2298: return currentState;
2299: }
2300:
2301: try {
2302: List lst = getDelegate().selectFiredTriggerRecordsByJob(
2303: conn, jobName, jobGroupName);
2304:
2305: if (lst.size() > 0) {
2306: FiredTriggerRecord rec = (FiredTriggerRecord) lst
2307: .get(0);
2308: if (rec.isJobIsStateful()) { // TODO: worry about
2309: // failed/recovering/volatile job
2310: // states?
2311: return (STATE_PAUSED.equals(currentState)) ? STATE_PAUSED_BLOCKED
2312: : STATE_BLOCKED;
2313: }
2314: }
2315:
2316: return currentState;
2317: } catch (SQLException e) {
2318: throw new JobPersistenceException(
2319: "Couldn't determine if trigger should be in a blocked state '"
2320: + jobGroupName + "." + jobName + "': "
2321: + e.getMessage(), e);
2322: }
2323:
2324: }
2325:
2326: /*
2327: * private List findTriggersToBeBlocked(Connection conn, SchedulingContext
2328: * ctxt, String groupName) throws JobPersistenceException {
2329: *
2330: * try { List blockList = new LinkedList();
2331: *
2332: * List affectingJobs =
2333: * getDelegate().selectStatefulJobsOfTriggerGroup(conn, groupName);
2334: *
2335: * Iterator itr = affectingJobs.iterator(); while(itr.hasNext()) { Key
2336: * jobKey = (Key) itr.next();
2337: *
2338: * List lst = getDelegate().selectFiredTriggerRecordsByJob(conn,
2339: * jobKey.getName(), jobKey.getGroup());
2340: *
2341: * This logic is BROKEN...
2342: *
2343: * if(lst.size() > 0) { FiredTriggerRecord rec =
2344: * (FiredTriggerRecord)lst.get(0); if(rec.isJobIsStateful()) // TODO: worry
2345: * about failed/recovering/volatile job states? blockList.add(
2346: * rec.getTriggerKey() ); } }
2347: *
2348: *
2349: * return blockList; } catch (SQLException e) { throw new
2350: * JobPersistenceException ("Couldn't determine states of resumed triggers
2351: * in group '" + groupName + "': " + e.getMessage(), e); } }
2352: */
2353:
2354: /**
2355: * <p>
2356: * Resume (un-pause) the <code>{@link org.quartz.Trigger}</code> with the
2357: * given name.
2358: * </p>
2359: *
2360: * <p>
2361: * If the <code>Trigger</code> missed one or more fire-times, then the
2362: * <code>Trigger</code>'s misfire instruction will be applied.
2363: * </p>
2364: *
2365: * @see #pauseTrigger(SchedulingContext, String, String)
2366: */
2367: public void resumeTrigger(final SchedulingContext ctxt,
2368: final String triggerName, final String groupName)
2369: throws JobPersistenceException {
2370: executeInLock(LOCK_TRIGGER_ACCESS,
2371: new VoidTransactionCallback() {
2372: public void execute(Connection conn)
2373: throws JobPersistenceException {
2374: resumeTrigger(conn, ctxt, triggerName,
2375: groupName);
2376: }
2377: });
2378: }
2379:
2380: /**
2381: * <p>
2382: * Resume (un-pause) the <code>{@link org.quartz.Trigger}</code> with the
2383: * given name.
2384: * </p>
2385: *
2386: * <p>
2387: * If the <code>Trigger</code> missed one or more fire-times, then the
2388: * <code>Trigger</code>'s misfire instruction will be applied.
2389: * </p>
2390: *
2391: * @see #pauseTrigger(Connection, SchedulingContext, String, String)
2392: */
2393: public void resumeTrigger(Connection conn, SchedulingContext ctxt,
2394: String triggerName, String groupName)
2395: throws JobPersistenceException {
2396: try {
2397:
2398: TriggerStatus status = getDelegate().selectTriggerStatus(
2399: conn, triggerName, groupName);
2400:
2401: if (status == null || status.getNextFireTime() == null) {
2402: return;
2403: }
2404:
2405: boolean blocked = false;
2406: if (STATE_PAUSED_BLOCKED.equals(status.getStatus())) {
2407: blocked = true;
2408: }
2409:
2410: String newState = checkBlockedState(conn, ctxt, status
2411: .getJobKey().getName(), status.getJobKey()
2412: .getGroup(), STATE_WAITING);
2413:
2414: boolean misfired = false;
2415:
2416: if (status.getNextFireTime().before(new Date())) {
2417: misfired = updateMisfiredTrigger(conn, ctxt,
2418: triggerName, groupName, newState, true);
2419: }
2420:
2421: if (!misfired) {
2422: if (blocked) {
2423: getDelegate().updateTriggerStateFromOtherState(
2424: conn, triggerName, groupName, newState,
2425: STATE_PAUSED_BLOCKED);
2426: } else {
2427: getDelegate().updateTriggerStateFromOtherState(
2428: conn, triggerName, groupName, newState,
2429: STATE_PAUSED);
2430: }
2431: }
2432:
2433: } catch (SQLException e) {
2434: throw new JobPersistenceException(
2435: "Couldn't resume trigger '" + groupName + "."
2436: + triggerName + "': " + e.getMessage(), e);
2437: }
2438: }
2439:
2440: /**
2441: * <p>
2442: * Resume (un-pause) the <code>{@link org.quartz.Job}</code> with the
2443: * given name.
2444: * </p>
2445: *
2446: * <p>
2447: * If any of the <code>Job</code>'s<code>Trigger</code> s missed one
2448: * or more fire-times, then the <code>Trigger</code>'s misfire
2449: * instruction will be applied.
2450: * </p>
2451: *
2452: * @see #pauseJob(SchedulingContext, String, String)
2453: */
2454: public void resumeJob(final SchedulingContext ctxt,
2455: final String jobName, final String groupName)
2456: throws JobPersistenceException {
2457: executeInLock(LOCK_TRIGGER_ACCESS,
2458: new VoidTransactionCallback() {
2459: public void execute(Connection conn)
2460: throws JobPersistenceException {
2461: Trigger[] triggers = getTriggersForJob(conn,
2462: ctxt, jobName, groupName);
2463: for (int j = 0; j < triggers.length; j++) {
2464: resumeTrigger(conn, ctxt, triggers[j]
2465: .getName(), triggers[j].getGroup());
2466: }
2467: }
2468: });
2469: }
2470:
2471: /**
2472: * <p>
2473: * Resume (un-pause) all of the <code>{@link org.quartz.Job}s</code> in
2474: * the given group.
2475: * </p>
2476: *
2477: * <p>
2478: * If any of the <code>Job</code> s had <code>Trigger</code> s that
2479: * missed one or more fire-times, then the <code>Trigger</code>'s
2480: * misfire instruction will be applied.
2481: * </p>
2482: *
2483: * @see #pauseJobGroup(SchedulingContext, String)
2484: */
2485: public void resumeJobGroup(final SchedulingContext ctxt,
2486: final String groupName) throws JobPersistenceException {
2487: executeInLock(LOCK_TRIGGER_ACCESS,
2488: new VoidTransactionCallback() {
2489: public void execute(Connection conn)
2490: throws JobPersistenceException {
2491: String[] jobNames = getJobNames(conn, ctxt,
2492: groupName);
2493:
2494: for (int i = 0; i < jobNames.length; i++) {
2495: Trigger[] triggers = getTriggersForJob(
2496: conn, ctxt, jobNames[i], groupName);
2497: for (int j = 0; j < triggers.length; j++) {
2498: resumeTrigger(conn, ctxt, triggers[j]
2499: .getName(), triggers[j]
2500: .getGroup());
2501: }
2502: }
2503: }
2504: });
2505: }
2506:
2507: /**
2508: * <p>
2509: * Pause all of the <code>{@link org.quartz.Trigger}s</code> in the
2510: * given group.
2511: * </p>
2512: *
2513: * @see #resumeTriggerGroup(SchedulingContext, String)
2514: */
2515: public void pauseTriggerGroup(final SchedulingContext ctxt,
2516: final String groupName) throws JobPersistenceException {
2517: executeInLock(LOCK_TRIGGER_ACCESS,
2518: new VoidTransactionCallback() {
2519: public void execute(Connection conn)
2520: throws JobPersistenceException {
2521: pauseTriggerGroup(conn, ctxt, groupName);
2522: }
2523: });
2524: }
2525:
2526: /**
2527: * <p>
2528: * Pause all of the <code>{@link org.quartz.Trigger}s</code> in the
2529: * given group.
2530: * </p>
2531: *
2532: * @see #resumeTriggerGroup(Connection, SchedulingContext, String)
2533: */
2534: public void pauseTriggerGroup(Connection conn,
2535: SchedulingContext ctxt, String groupName)
2536: throws JobPersistenceException {
2537:
2538: try {
2539:
2540: getDelegate().updateTriggerGroupStateFromOtherStates(conn,
2541: groupName, STATE_PAUSED, STATE_ACQUIRED,
2542: STATE_WAITING, STATE_WAITING);
2543:
2544: getDelegate().updateTriggerGroupStateFromOtherState(conn,
2545: groupName, STATE_PAUSED_BLOCKED, STATE_BLOCKED);
2546:
2547: if (!getDelegate().isTriggerGroupPaused(conn, groupName)) {
2548: getDelegate().insertPausedTriggerGroup(conn, groupName);
2549: }
2550:
2551: } catch (SQLException e) {
2552: throw new JobPersistenceException(
2553: "Couldn't pause trigger group '" + groupName
2554: + "': " + e.getMessage(), e);
2555: }
2556: }
2557:
2558: public Set getPausedTriggerGroups(final SchedulingContext ctxt)
2559: throws JobPersistenceException {
2560: return (Set) executeWithoutLock( // no locks necessary for read...
2561: new TransactionCallback() {
2562: public Object execute(Connection conn)
2563: throws JobPersistenceException {
2564: return getPausedTriggerGroups(conn, ctxt);
2565: }
2566: });
2567: }
2568:
2569: /**
2570: * <p>
2571: * Pause all of the <code>{@link org.quartz.Trigger}s</code> in the
2572: * given group.
2573: * </p>
2574: *
2575: * @see #resumeTriggerGroup(Connection, SchedulingContext, String)
2576: */
2577: public Set getPausedTriggerGroups(Connection conn,
2578: SchedulingContext ctxt) throws JobPersistenceException {
2579:
2580: try {
2581: return getDelegate().selectPausedTriggerGroups(conn);
2582: } catch (SQLException e) {
2583: throw new JobPersistenceException(
2584: "Couldn't determine paused trigger groups: "
2585: + e.getMessage(), e);
2586: }
2587: }
2588:
2589: /**
2590: * <p>
2591: * Resume (un-pause) all of the <code>{@link org.quartz.Trigger}s</code>
2592: * in the given group.
2593: * </p>
2594: *
2595: * <p>
2596: * If any <code>Trigger</code> missed one or more fire-times, then the
2597: * <code>Trigger</code>'s misfire instruction will be applied.
2598: * </p>
2599: *
2600: * @see #pauseTriggerGroup(SchedulingContext, String)
2601: */
2602: public void resumeTriggerGroup(final SchedulingContext ctxt,
2603: final String groupName) throws JobPersistenceException {
2604: executeInLock(LOCK_TRIGGER_ACCESS,
2605: new VoidTransactionCallback() {
2606: public void execute(Connection conn)
2607: throws JobPersistenceException {
2608: resumeTriggerGroup(conn, ctxt, groupName);
2609: }
2610: });
2611: }
2612:
2613: /**
2614: * <p>
2615: * Resume (un-pause) all of the <code>{@link org.quartz.Trigger}s</code>
2616: * in the given group.
2617: * </p>
2618: *
2619: * <p>
2620: * If any <code>Trigger</code> missed one or more fire-times, then the
2621: * <code>Trigger</code>'s misfire instruction will be applied.
2622: * </p>
2623: *
2624: * @see #pauseTriggerGroup(Connection, SchedulingContext, String)
2625: */
2626: public void resumeTriggerGroup(Connection conn,
2627: SchedulingContext ctxt, String groupName)
2628: throws JobPersistenceException {
2629:
2630: try {
2631:
2632: getDelegate().deletePausedTriggerGroup(conn, groupName);
2633:
2634: String[] trigNames = getDelegate().selectTriggersInGroup(
2635: conn, groupName);
2636:
2637: for (int i = 0; i < trigNames.length; i++) {
2638: resumeTrigger(conn, ctxt, trigNames[i], groupName);
2639: }
2640:
2641: // TODO: find an efficient way to resume triggers (better than the
2642: // above)... logic below is broken because of
2643: // findTriggersToBeBlocked()
2644: /*
2645: * int res =
2646: * getDelegate().updateTriggerGroupStateFromOtherState(conn,
2647: * groupName, STATE_WAITING, STATE_PAUSED);
2648: *
2649: * if(res > 0) {
2650: *
2651: * long misfireTime = System.currentTimeMillis();
2652: * if(getMisfireThreshold() > 0) misfireTime -=
2653: * getMisfireThreshold();
2654: *
2655: * Key[] misfires =
2656: * getDelegate().selectMisfiredTriggersInGroupInState(conn,
2657: * groupName, STATE_WAITING, misfireTime);
2658: *
2659: * List blockedTriggers = findTriggersToBeBlocked(conn, ctxt,
2660: * groupName);
2661: *
2662: * Iterator itr = blockedTriggers.iterator(); while(itr.hasNext()) {
2663: * Key key = (Key)itr.next();
2664: * getDelegate().updateTriggerState(conn, key.getName(),
2665: * key.getGroup(), STATE_BLOCKED); }
2666: *
2667: * for(int i=0; i < misfires.length; i++) { String
2668: * newState = STATE_WAITING;
2669: * if(blockedTriggers.contains(misfires[i])) newState =
2670: * STATE_BLOCKED; updateMisfiredTrigger(conn, ctxt,
2671: * misfires[i].getName(), misfires[i].getGroup(), newState, true); } }
2672: */
2673:
2674: } catch (SQLException e) {
2675: throw new JobPersistenceException(
2676: "Couldn't pause trigger group '" + groupName
2677: + "': " + e.getMessage(), e);
2678: }
2679: }
2680:
2681: /**
2682: * <p>
2683: * Pause all triggers - equivalent of calling <code>pauseTriggerGroup(group)</code>
2684: * on every group.
2685: * </p>
2686: *
2687: * <p>
2688: * When <code>resumeAll()</code> is called (to un-pause), trigger misfire
2689: * instructions WILL be applied.
2690: * </p>
2691: *
2692: * @see #resumeAll(SchedulingContext)
2693: * @see #pauseTriggerGroup(SchedulingContext, String)
2694: */
2695: public void pauseAll(final SchedulingContext ctxt)
2696: throws JobPersistenceException {
2697: executeInLock(LOCK_TRIGGER_ACCESS,
2698: new VoidTransactionCallback() {
2699: public void execute(Connection conn)
2700: throws JobPersistenceException {
2701: pauseAll(conn, ctxt);
2702: }
2703: });
2704: }
2705:
2706: /**
2707: * <p>
2708: * Pause all triggers - equivalent of calling <code>pauseTriggerGroup(group)</code>
2709: * on every group.
2710: * </p>
2711: *
2712: * <p>
2713: * When <code>resumeAll()</code> is called (to un-pause), trigger misfire
2714: * instructions WILL be applied.
2715: * </p>
2716: *
2717: * @see #resumeAll(SchedulingContext)
2718: * @see #pauseTriggerGroup(SchedulingContext, String)
2719: */
2720: public void pauseAll(Connection conn, SchedulingContext ctxt)
2721: throws JobPersistenceException {
2722:
2723: String[] names = getTriggerGroupNames(conn, ctxt);
2724:
2725: for (int i = 0; i < names.length; i++) {
2726: pauseTriggerGroup(conn, ctxt, names[i]);
2727: }
2728:
2729: try {
2730: if (!getDelegate().isTriggerGroupPaused(conn,
2731: ALL_GROUPS_PAUSED)) {
2732: getDelegate().insertPausedTriggerGroup(conn,
2733: ALL_GROUPS_PAUSED);
2734: }
2735:
2736: } catch (SQLException e) {
2737: throw new JobPersistenceException(
2738: "Couldn't pause all trigger groups: "
2739: + e.getMessage(), e);
2740: }
2741:
2742: }
2743:
2744: /**
2745: * <p>
2746: * Resume (un-pause) all triggers - equivalent of calling <code>resumeTriggerGroup(group)</code>
2747: * on every group.
2748: * </p>
2749: *
2750: * <p>
2751: * If any <code>Trigger</code> missed one or more fire-times, then the
2752: * <code>Trigger</code>'s misfire instruction will be applied.
2753: * </p>
2754: *
2755: * @see #pauseAll(SchedulingContext)
2756: */
2757: public void resumeAll(final SchedulingContext ctxt)
2758: throws JobPersistenceException {
2759: executeInLock(LOCK_TRIGGER_ACCESS,
2760: new VoidTransactionCallback() {
2761: public void execute(Connection conn)
2762: throws JobPersistenceException {
2763: resumeAll(conn, ctxt);
2764: }
2765: });
2766: }
2767:
2768: /**
2769: * protected
2770: * <p>
2771: * Resume (un-pause) all triggers - equivalent of calling <code>resumeTriggerGroup(group)</code>
2772: * on every group.
2773: * </p>
2774: *
2775: * <p>
2776: * If any <code>Trigger</code> missed one or more fire-times, then the
2777: * <code>Trigger</code>'s misfire instruction will be applied.
2778: * </p>
2779: *
2780: * @see #pauseAll(SchedulingContext)
2781: */
2782: public void resumeAll(Connection conn, SchedulingContext ctxt)
2783: throws JobPersistenceException {
2784:
2785: String[] names = getTriggerGroupNames(conn, ctxt);
2786:
2787: for (int i = 0; i < names.length; i++) {
2788: resumeTriggerGroup(conn, ctxt, names[i]);
2789: }
2790:
2791: try {
2792: getDelegate().deletePausedTriggerGroup(conn,
2793: ALL_GROUPS_PAUSED);
2794: } catch (SQLException e) {
2795: throw new JobPersistenceException(
2796: "Couldn't resume all trigger groups: "
2797: + e.getMessage(), e);
2798: }
2799: }
2800:
2801: private static long ftrCtr = System.currentTimeMillis();
2802:
2803: protected synchronized String getFiredTriggerRecordId() {
2804: return getInstanceId() + ftrCtr++;
2805: }
2806:
2807: /**
2808: * <p>
2809: * Get a handle to the next N triggers to be fired, and mark them as 'reserved'
2810: * by the calling scheduler.
2811: * </p>
2812: *
2813: * @see #releaseAcquiredTrigger(SchedulingContext, Trigger)
2814: */
2815: public Trigger acquireNextTrigger(final SchedulingContext ctxt,
2816: final long noLaterThan) throws JobPersistenceException {
2817: return (Trigger) executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
2818: new TransactionCallback() {
2819: public Object execute(Connection conn)
2820: throws JobPersistenceException {
2821: return acquireNextTrigger(conn, ctxt,
2822: noLaterThan);
2823: }
2824: });
2825: }
2826:
2827: // TODO: this really ought to return something like a FiredTriggerBundle,
2828: // so that the fireInstanceId doesn't have to be on the trigger...
2829: protected Trigger acquireNextTrigger(Connection conn,
2830: SchedulingContext ctxt, long noLaterThan)
2831: throws JobPersistenceException {
2832: do {
2833: try {
2834: Key triggerKey = getDelegate().selectTriggerToAcquire(
2835: conn, noLaterThan, getMisfireTime());
2836:
2837: // No trigger is ready to fire yet.
2838: if (triggerKey == null) {
2839: return null;
2840: }
2841:
2842: int rowsUpdated = getDelegate()
2843: .updateTriggerStateFromOtherState(conn,
2844: triggerKey.getName(),
2845: triggerKey.getGroup(), STATE_ACQUIRED,
2846: STATE_WAITING);
2847:
2848: // If our trigger was no longer in the expected state, try a new one.
2849: if (rowsUpdated <= 0) {
2850: continue;
2851: }
2852:
2853: Trigger nextTrigger = retrieveTrigger(conn, ctxt,
2854: triggerKey.getName(), triggerKey.getGroup());
2855:
2856: // If our trigger is no longer available, try a new one.
2857: if (nextTrigger == null) {
2858: continue;
2859: }
2860:
2861: nextTrigger
2862: .setFireInstanceId(getFiredTriggerRecordId());
2863: getDelegate().insertFiredTrigger(conn, nextTrigger,
2864: STATE_ACQUIRED, null);
2865:
2866: return nextTrigger;
2867: } catch (Exception e) {
2868: throw new JobPersistenceException(
2869: "Couldn't acquire next trigger: "
2870: + e.getMessage(), e);
2871: }
2872: } while (true);
2873: }
2874:
2875: /**
2876: * <p>
2877: * Inform the <code>JobStore</code> that the scheduler no longer plans to
2878: * fire the given <code>Trigger</code>, that it had previously acquired
2879: * (reserved).
2880: * </p>
2881: */
2882: public void releaseAcquiredTrigger(final SchedulingContext ctxt,
2883: final Trigger trigger) throws JobPersistenceException {
2884: executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
2885: new VoidTransactionCallback() {
2886: public void execute(Connection conn)
2887: throws JobPersistenceException {
2888: releaseAcquiredTrigger(conn, ctxt, trigger);
2889: }
2890: });
2891: }
2892:
2893: protected void releaseAcquiredTrigger(Connection conn,
2894: SchedulingContext ctxt, Trigger trigger)
2895: throws JobPersistenceException {
2896: try {
2897: getDelegate().updateTriggerStateFromOtherState(conn,
2898: trigger.getName(), trigger.getGroup(),
2899: STATE_WAITING, STATE_ACQUIRED);
2900: getDelegate().deleteFiredTrigger(conn,
2901: trigger.getFireInstanceId());
2902: } catch (SQLException e) {
2903: throw new JobPersistenceException(
2904: "Couldn't release acquired trigger: "
2905: + e.getMessage(), e);
2906: }
2907: }
2908:
2909: /**
2910: * <p>
2911: * Inform the <code>JobStore</code> that the scheduler is now firing the
2912: * given <code>Trigger</code> (executing its associated <code>Job</code>),
2913: * that it had previously acquired (reserved).
2914: * </p>
2915: *
2916: * @return null if the trigger or its job or calendar no longer exist, or
2917: * if the trigger was not successfully put into the 'executing'
2918: * state.
2919: */
2920: public TriggerFiredBundle triggerFired(
2921: final SchedulingContext ctxt, final Trigger trigger)
2922: throws JobPersistenceException {
2923: return (TriggerFiredBundle) executeInNonManagedTXLock(
2924: LOCK_TRIGGER_ACCESS, new TransactionCallback() {
2925: public Object execute(Connection conn)
2926: throws JobPersistenceException {
2927: try {
2928: return triggerFired(conn, ctxt, trigger);
2929: } catch (JobPersistenceException jpe) {
2930: // If job didn't exisit, we still want to commit our work and return null.
2931: if (jpe.getErrorCode() == SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST) {
2932: return null;
2933: } else {
2934: throw jpe;
2935: }
2936: }
2937: }
2938: });
2939: }
2940:
2941: protected TriggerFiredBundle triggerFired(Connection conn,
2942: SchedulingContext ctxt, Trigger trigger)
2943: throws JobPersistenceException {
2944: JobDetail job = null;
2945: Calendar cal = null;
2946:
2947: // Make sure trigger wasn't deleted, paused, or completed...
2948: try { // if trigger was deleted, state will be STATE_DELETED
2949: String state = getDelegate().selectTriggerState(conn,
2950: trigger.getName(), trigger.getGroup());
2951: if (!state.equals(STATE_ACQUIRED)) {
2952: return null;
2953: }
2954: } catch (SQLException e) {
2955: throw new JobPersistenceException(
2956: "Couldn't select trigger state: " + e.getMessage(),
2957: e);
2958: }
2959:
2960: try {
2961: job = retrieveJob(conn, ctxt, trigger.getJobName(), trigger
2962: .getJobGroup());
2963: if (job == null) {
2964: return null;
2965: }
2966: } catch (JobPersistenceException jpe) {
2967: try {
2968: getLog()
2969: .error(
2970: "Error retrieving job, setting trigger state to ERROR.",
2971: jpe);
2972: getDelegate().updateTriggerState(conn,
2973: trigger.getName(), trigger.getGroup(),
2974: STATE_ERROR);
2975: } catch (SQLException sqle) {
2976: getLog().error("Unable to set trigger state to ERROR.",
2977: sqle);
2978: }
2979: throw jpe;
2980: }
2981:
2982: if (trigger.getCalendarName() != null) {
2983: cal = retrieveCalendar(conn, ctxt, trigger
2984: .getCalendarName());
2985: if (cal == null) {
2986: return null;
2987: }
2988: }
2989:
2990: try {
2991: getDelegate().deleteFiredTrigger(conn,
2992: trigger.getFireInstanceId());
2993: getDelegate().insertFiredTrigger(conn, trigger,
2994: STATE_EXECUTING, job);
2995: } catch (SQLException e) {
2996: throw new JobPersistenceException(
2997: "Couldn't insert fired trigger: " + e.getMessage(),
2998: e);
2999: }
3000:
3001: Date prevFireTime = trigger.getPreviousFireTime();
3002:
3003: // call triggered - to update the trigger's next-fire-time state...
3004: trigger.triggered(cal);
3005:
3006: String state = STATE_WAITING;
3007: boolean force = true;
3008:
3009: if (job.isStateful()) {
3010: state = STATE_BLOCKED;
3011: force = false;
3012: try {
3013: getDelegate().updateTriggerStatesForJobFromOtherState(
3014: conn, job.getName(), job.getGroup(),
3015: STATE_BLOCKED, STATE_WAITING);
3016: getDelegate().updateTriggerStatesForJobFromOtherState(
3017: conn, job.getName(), job.getGroup(),
3018: STATE_BLOCKED, STATE_ACQUIRED);
3019: getDelegate().updateTriggerStatesForJobFromOtherState(
3020: conn, job.getName(), job.getGroup(),
3021: STATE_PAUSED_BLOCKED, STATE_PAUSED);
3022: } catch (SQLException e) {
3023: throw new JobPersistenceException(
3024: "Couldn't update states of blocked triggers: "
3025: + e.getMessage(), e);
3026: }
3027: }
3028:
3029: if (trigger.getNextFireTime() == null) {
3030: state = STATE_COMPLETE;
3031: force = true;
3032: }
3033:
3034: storeTrigger(conn, ctxt, trigger, job, true, state, force,
3035: false);
3036:
3037: job.getJobDataMap().clearDirtyFlag();
3038:
3039: return new TriggerFiredBundle(job, trigger, cal, trigger
3040: .getGroup().equals(Scheduler.DEFAULT_RECOVERY_GROUP),
3041: new Date(), trigger.getPreviousFireTime(),
3042: prevFireTime, trigger.getNextFireTime());
3043: }
3044:
3045: /**
3046: * <p>
3047: * Inform the <code>JobStore</code> that the scheduler has completed the
3048: * firing of the given <code>Trigger</code> (and the execution its
3049: * associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code>
3050: * in the given <code>JobDetail</code> should be updated if the <code>Job</code>
3051: * is stateful.
3052: * </p>
3053: */
3054: public void triggeredJobComplete(final SchedulingContext ctxt,
3055: final Trigger trigger, final JobDetail jobDetail,
3056: final int triggerInstCode) throws JobPersistenceException {
3057: executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
3058: new VoidTransactionCallback() {
3059: public void execute(Connection conn)
3060: throws JobPersistenceException {
3061: triggeredJobComplete(conn, ctxt, trigger,
3062: jobDetail, triggerInstCode);
3063: }
3064: });
3065: }
3066:
3067: protected void triggeredJobComplete(Connection conn,
3068: SchedulingContext ctxt, Trigger trigger,
3069: JobDetail jobDetail, int triggerInstCode)
3070: throws JobPersistenceException {
3071: try {
3072: if (triggerInstCode == Trigger.INSTRUCTION_DELETE_TRIGGER) {
3073: if (trigger.getNextFireTime() == null) {
3074: // double check for possible reschedule within job
3075: // execution, which would cancel the need to delete...
3076: TriggerStatus stat = getDelegate()
3077: .selectTriggerStatus(conn,
3078: trigger.getName(),
3079: trigger.getGroup());
3080: if (stat != null && stat.getNextFireTime() == null) {
3081: removeTrigger(conn, ctxt, trigger.getName(),
3082: trigger.getGroup());
3083: }
3084: } else {
3085: removeTrigger(conn, ctxt, trigger.getName(),
3086: trigger.getGroup());
3087: }
3088: } else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_COMPLETE) {
3089: getDelegate().updateTriggerState(conn,
3090: trigger.getName(), trigger.getGroup(),
3091: STATE_COMPLETE);
3092: } else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_ERROR) {
3093: getLog().info(
3094: "Trigger " + trigger.getFullName()
3095: + " set to ERROR state.");
3096: getDelegate().updateTriggerState(conn,
3097: trigger.getName(), trigger.getGroup(),
3098: STATE_ERROR);
3099: } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_COMPLETE) {
3100: getDelegate().updateTriggerStatesForJob(conn,
3101: trigger.getJobName(), trigger.getJobGroup(),
3102: STATE_COMPLETE);
3103: } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR) {
3104: getLog().info(
3105: "All triggers of Job "
3106: + trigger.getFullJobName()
3107: + " set to ERROR state.");
3108: getDelegate().updateTriggerStatesForJob(conn,
3109: trigger.getJobName(), trigger.getJobGroup(),
3110: STATE_ERROR);
3111: }
3112:
3113: if (jobDetail.isStateful()) {
3114: getDelegate().updateTriggerStatesForJobFromOtherState(
3115: conn, jobDetail.getName(),
3116: jobDetail.getGroup(), STATE_WAITING,
3117: STATE_BLOCKED);
3118:
3119: getDelegate().updateTriggerStatesForJobFromOtherState(
3120: conn, jobDetail.getName(),
3121: jobDetail.getGroup(), STATE_PAUSED,
3122: STATE_PAUSED_BLOCKED);
3123:
3124: try {
3125: if (jobDetail.getJobDataMap().isDirty()) {
3126: getDelegate().updateJobData(conn, jobDetail);
3127: }
3128: } catch (IOException e) {
3129: throw new JobPersistenceException(
3130: "Couldn't serialize job data: "
3131: + e.getMessage(), e);
3132: } catch (SQLException e) {
3133: throw new JobPersistenceException(
3134: "Couldn't update job data: "
3135: + e.getMessage(), e);
3136: }
3137: }
3138: } catch (SQLException e) {
3139: throw new JobPersistenceException(
3140: "Couldn't update trigger state(s): "
3141: + e.getMessage(), e);
3142: }
3143:
3144: try {
3145: getDelegate().deleteFiredTrigger(conn,
3146: trigger.getFireInstanceId());
3147: } catch (SQLException e) {
3148: throw new JobPersistenceException(
3149: "Couldn't delete fired trigger: " + e.getMessage(),
3150: e);
3151: }
3152: }
3153:
3154: /**
3155: * <P>
3156: * Get the driver delegate for DB operations.
3157: * </p>
3158: */
3159: protected DriverDelegate getDelegate()
3160: throws NoSuchDelegateException {
3161: if (null == delegate) {
3162: try {
3163: if (delegateClassName != null) {
3164: delegateClass = getClassLoadHelper().loadClass(
3165: delegateClassName);
3166: }
3167:
3168: Constructor ctor = null;
3169: Object[] ctorParams = null;
3170: if (canUseProperties()) {
3171: Class[] ctorParamTypes = new Class[] { Log.class,
3172: String.class, String.class, Boolean.class };
3173: ctor = delegateClass.getConstructor(ctorParamTypes);
3174: ctorParams = new Object[] { getLog(), tablePrefix,
3175: instanceId, new Boolean(canUseProperties()) };
3176: } else {
3177: Class[] ctorParamTypes = new Class[] { Log.class,
3178: String.class, String.class };
3179: ctor = delegateClass.getConstructor(ctorParamTypes);
3180: ctorParams = new Object[] { getLog(), tablePrefix,
3181: instanceId };
3182: }
3183:
3184: delegate = (DriverDelegate) ctor
3185: .newInstance(ctorParams);
3186: } catch (NoSuchMethodException e) {
3187: throw new NoSuchDelegateException(
3188: "Couldn't find delegate constructor: "
3189: + e.getMessage());
3190: } catch (InstantiationException e) {
3191: throw new NoSuchDelegateException(
3192: "Couldn't create delegate: " + e.getMessage());
3193: } catch (IllegalAccessException e) {
3194: throw new NoSuchDelegateException(
3195: "Couldn't create delegate: " + e.getMessage());
3196: } catch (InvocationTargetException e) {
3197: throw new NoSuchDelegateException(
3198: "Couldn't create delegate: " + e.getMessage());
3199: } catch (ClassNotFoundException e) {
3200: throw new NoSuchDelegateException(
3201: "Couldn't load delegate class: "
3202: + e.getMessage());
3203: }
3204: }
3205:
3206: return delegate;
3207: }
3208:
3209: protected Semaphore getLockHandler() {
3210: return lockHandler;
3211: }
3212:
3213: public void setLockHandler(Semaphore lockHandler) {
3214: this .lockHandler = lockHandler;
3215: }
3216:
3217: //---------------------------------------------------------------------------
3218: // Management methods
3219: //---------------------------------------------------------------------------
3220:
3221: protected RecoverMisfiredJobsResult doRecoverMisfires()
3222: throws JobPersistenceException {
3223: boolean transOwner = false;
3224: Connection conn = getNonManagedTXConnection();
3225: try {
3226: RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
3227:
3228: // Before we make the potentially expensive call to acquire the
3229: // trigger lock, peek ahead to see if it is likely we would find
3230: // misfired triggers requiring recovery.
3231: int misfireCount = (getDoubleCheckLockMisfireHandler()) ? getDelegate()
3232: .countMisfiredTriggersInStates(conn,
3233: STATE_MISFIRED, STATE_WAITING,
3234: getMisfireTime())
3235: : Integer.MAX_VALUE;
3236:
3237: if (misfireCount == 0) {
3238: getLog()
3239: .debug(
3240: "Found 0 triggers that missed their scheduled fire-time.");
3241: } else {
3242: transOwner = getLockHandler().obtainLock(conn,
3243: LOCK_TRIGGER_ACCESS);
3244:
3245: result = recoverMisfiredJobs(conn, false);
3246: }
3247:
3248: commitConnection(conn);
3249: return result;
3250: } catch (JobPersistenceException e) {
3251: rollbackConnection(conn);
3252: throw e;
3253: } catch (SQLException e) {
3254: rollbackConnection(conn);
3255: throw new JobPersistenceException(
3256: "Database error recovering from misfires.", e);
3257: } catch (RuntimeException e) {
3258: rollbackConnection(conn);
3259: throw new JobPersistenceException(
3260: "Unexpected runtime exception: " + e.getMessage(),
3261: e);
3262: } finally {
3263: try {
3264: releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
3265: } finally {
3266: cleanupConnection(conn);
3267: }
3268: }
3269: }
3270:
3271: protected void signalSchedulingChange() {
3272: signaler.signalSchedulingChange();
3273: }
3274:
3275: //---------------------------------------------------------------------------
3276: // Cluster management methods
3277: //---------------------------------------------------------------------------
3278:
3279: protected boolean firstCheckIn = true;
3280:
3281: protected long lastCheckin = System.currentTimeMillis();
3282:
3283: protected boolean doCheckin() throws JobPersistenceException {
3284: boolean transOwner = false;
3285: boolean transStateOwner = false;
3286: boolean recovered = false;
3287:
3288: Connection conn = getNonManagedTXConnection();
3289: try {
3290: // Other than the first time, always checkin first to make sure there is
3291: // work to be done before we aquire the lock (since that is expensive,
3292: // and is almost never necessary). This must be done in a separate
3293: // transaction to prevent a deadlock under recovery conditions.
3294: List failedRecords = null;
3295: if (firstCheckIn == false) {
3296: boolean succeeded = false;
3297: try {
3298: failedRecords = clusterCheckIn(conn);
3299: commitConnection(conn);
3300: succeeded = true;
3301: } catch (JobPersistenceException e) {
3302: rollbackConnection(conn);
3303: throw e;
3304: } finally {
3305: // Only cleanup the connection if we failed and are bailing
3306: // as we will otherwise continue to use it.
3307: if (succeeded == false) {
3308: cleanupConnection(conn);
3309: }
3310: }
3311: }
3312:
3313: if (firstCheckIn || (failedRecords.size() > 0)) {
3314: getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
3315: transStateOwner = true;
3316:
3317: // Now that we own the lock, make sure we still have work to do.
3318: // The first time through, we also need to make sure we update/create our state record
3319: failedRecords = (firstCheckIn) ? clusterCheckIn(conn)
3320: : findFailedInstances(conn);
3321:
3322: if (failedRecords.size() > 0) {
3323: getLockHandler().obtainLock(conn,
3324: LOCK_TRIGGER_ACCESS);
3325: //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
3326: transOwner = true;
3327:
3328: clusterRecover(conn, failedRecords);
3329: recovered = true;
3330: }
3331: }
3332:
3333: commitConnection(conn);
3334: } catch (JobPersistenceException e) {
3335: rollbackConnection(conn);
3336: throw e;
3337: } finally {
3338: try {
3339: releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
3340: } finally {
3341: try {
3342: releaseLock(conn, LOCK_STATE_ACCESS,
3343: transStateOwner);
3344: } finally {
3345: cleanupConnection(conn);
3346: }
3347: }
3348: }
3349:
3350: firstCheckIn = false;
3351:
3352: return recovered;
3353: }
3354:
3355: /**
3356: * Get a list of all scheduler instances in the cluster that may have failed.
3357: * This includes this scheduler if it is checking in for the first time.
3358: */
3359: protected List findFailedInstances(Connection conn)
3360: throws JobPersistenceException {
3361: try {
3362: List failedInstances = new LinkedList();
3363: boolean foundThisScheduler = false;
3364: long timeNow = System.currentTimeMillis();
3365:
3366: List states = getDelegate().selectSchedulerStateRecords(
3367: conn, null);
3368:
3369: for (Iterator itr = states.iterator(); itr.hasNext();) {
3370: SchedulerStateRecord rec = (SchedulerStateRecord) itr
3371: .next();
3372:
3373: // find own record...
3374: if (rec.getSchedulerInstanceId()
3375: .equals(getInstanceId())) {
3376: foundThisScheduler = true;
3377: if (firstCheckIn) {
3378: failedInstances.add(rec);
3379: }
3380: } else {
3381: // find failed instances...
3382: if (calcFailedIfAfter(rec) < timeNow) {
3383: failedInstances.add(rec);
3384: }
3385: }
3386: }
3387:
3388: // The first time through, also check for orphaned fired triggers.
3389: if (firstCheckIn) {
3390: failedInstances.addAll(findOrphanedFailedInstances(
3391: conn, states));
3392: }
3393:
3394: // If not the first time but we didn't find our own instance, then
3395: // Someone must have done recovery for us.
3396: if ((foundThisScheduler == false)
3397: && (firstCheckIn == false)) {
3398: // TODO: revisit when handle self-failed-out implied (see TODO in clusterCheckIn() below)
3399: getLog()
3400: .warn(
3401: "This scheduler instance ("
3402: + getInstanceId()
3403: + ") is still "
3404: + "active but was recovered by another instance in the cluster. "
3405: + "This may cause inconsistent behavior.");
3406: }
3407:
3408: return failedInstances;
3409: } catch (Exception e) {
3410: lastCheckin = System.currentTimeMillis();
3411: throw new JobPersistenceException(
3412: "Failure identifying failed instances when checking-in: "
3413: + e.getMessage(), e);
3414: }
3415: }
3416:
3417: /**
3418: * Create dummy <code>SchedulerStateRecord</code> objects for fired triggers
3419: * that have no scheduler state record. Checkin timestamp and interval are
3420: * left as zero on these dummy <code>SchedulerStateRecord</code> objects.
3421: *
3422: * @param schedulerStateRecords List of all current <code>SchedulerStateRecords</code>
3423: */
3424: private List findOrphanedFailedInstances(Connection conn,
3425: List schedulerStateRecords) throws SQLException,
3426: NoSuchDelegateException {
3427: List orphanedInstances = new ArrayList();
3428:
3429: Set allFiredTriggerInstanceNames = getDelegate()
3430: .selectFiredTriggerInstanceNames(conn);
3431: if (allFiredTriggerInstanceNames.isEmpty() == false) {
3432: for (Iterator schedulerStateIter = schedulerStateRecords
3433: .iterator(); schedulerStateIter.hasNext();) {
3434: SchedulerStateRecord rec = (SchedulerStateRecord) schedulerStateIter
3435: .next();
3436:
3437: allFiredTriggerInstanceNames.remove(rec
3438: .getSchedulerInstanceId());
3439: }
3440:
3441: for (Iterator orphanIter = allFiredTriggerInstanceNames
3442: .iterator(); orphanIter.hasNext();) {
3443:
3444: SchedulerStateRecord orphanedInstance = new SchedulerStateRecord();
3445: orphanedInstance
3446: .setSchedulerInstanceId((String) orphanIter
3447: .next());
3448:
3449: orphanedInstances.add(orphanedInstance);
3450:
3451: getLog().warn(
3452: "Found orphaned fired triggers for instance: "
3453: + orphanedInstance
3454: .getSchedulerInstanceId());
3455: }
3456: }
3457:
3458: return orphanedInstances;
3459: }
3460:
3461: protected long calcFailedIfAfter(SchedulerStateRecord rec) {
3462: return rec.getCheckinTimestamp()
3463: + Math.max(rec.getCheckinInterval(), (System
3464: .currentTimeMillis() - lastCheckin)) + 7500L;
3465: }
3466:
3467: protected List clusterCheckIn(Connection conn)
3468: throws JobPersistenceException {
3469:
3470: List failedInstances = findFailedInstances(conn);
3471:
3472: try {
3473: // TODO: handle self-failed-out
3474:
3475: // check in...
3476: lastCheckin = System.currentTimeMillis();
3477: if (getDelegate().updateSchedulerState(conn,
3478: getInstanceId(), lastCheckin) == 0) {
3479: getDelegate().insertSchedulerState(conn,
3480: getInstanceId(), lastCheckin,
3481: getClusterCheckinInterval());
3482: }
3483:
3484: } catch (Exception e) {
3485: throw new JobPersistenceException(
3486: "Failure updating scheduler state when checking-in: "
3487: + e.getMessage(), e);
3488: }
3489:
3490: return failedInstances;
3491: }
3492:
3493: protected void clusterRecover(Connection conn, List failedInstances)
3494: throws JobPersistenceException {
3495:
3496: if (failedInstances.size() > 0) {
3497:
3498: long recoverIds = System.currentTimeMillis();
3499:
3500: logWarnIfNonZero(failedInstances.size(),
3501: "ClusterManager: detected "
3502: + failedInstances.size()
3503: + " failed or restarted instances.");
3504: try {
3505: Iterator itr = failedInstances.iterator();
3506: while (itr.hasNext()) {
3507: SchedulerStateRecord rec = (SchedulerStateRecord) itr
3508: .next();
3509:
3510: getLog().info(
3511: "ClusterManager: Scanning for instance \""
3512: + rec.getSchedulerInstanceId()
3513: + "\"'s failed in-progress jobs.");
3514:
3515: List firedTriggerRecs = getDelegate()
3516: .selectInstancesFiredTriggerRecords(conn,
3517: rec.getSchedulerInstanceId());
3518:
3519: int acquiredCount = 0;
3520: int recoveredCount = 0;
3521: int otherCount = 0;
3522:
3523: Set triggerKeys = new HashSet();
3524:
3525: Iterator ftItr = firedTriggerRecs.iterator();
3526: while (ftItr.hasNext()) {
3527: FiredTriggerRecord ftRec = (FiredTriggerRecord) ftItr
3528: .next();
3529:
3530: Key tKey = ftRec.getTriggerKey();
3531: Key jKey = ftRec.getJobKey();
3532:
3533: triggerKeys.add(tKey);
3534:
3535: // release blocked triggers..
3536: if (ftRec.getFireInstanceState().equals(
3537: STATE_BLOCKED)) {
3538: getDelegate()
3539: .updateTriggerStatesForJobFromOtherState(
3540: conn, jKey.getName(),
3541: jKey.getGroup(),
3542: STATE_WAITING,
3543: STATE_BLOCKED);
3544: } else if (ftRec.getFireInstanceState().equals(
3545: STATE_PAUSED_BLOCKED)) {
3546: getDelegate()
3547: .updateTriggerStatesForJobFromOtherState(
3548: conn, jKey.getName(),
3549: jKey.getGroup(),
3550: STATE_PAUSED,
3551: STATE_PAUSED_BLOCKED);
3552: }
3553:
3554: // release acquired triggers..
3555: if (ftRec.getFireInstanceState().equals(
3556: STATE_ACQUIRED)) {
3557: getDelegate()
3558: .updateTriggerStateFromOtherState(
3559: conn, tKey.getName(),
3560: tKey.getGroup(),
3561: STATE_WAITING,
3562: STATE_ACQUIRED);
3563: acquiredCount++;
3564: } else if (ftRec.isJobRequestsRecovery()) {
3565: // handle jobs marked for recovery that were not fully
3566: // executed..
3567: if (jobExists(conn, jKey.getName(), jKey
3568: .getGroup())) {
3569: SimpleTrigger rcvryTrig = new SimpleTrigger(
3570: "recover_"
3571: + rec
3572: .getSchedulerInstanceId()
3573: + "_"
3574: + String
3575: .valueOf(recoverIds++),
3576: Scheduler.DEFAULT_RECOVERY_GROUP,
3577: new Date(ftRec
3578: .getFireTimestamp()));
3579: rcvryTrig.setJobName(jKey.getName());
3580: rcvryTrig.setJobGroup(jKey.getGroup());
3581: rcvryTrig
3582: .setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW);
3583: rcvryTrig.setPriority(ftRec
3584: .getPriority());
3585: JobDataMap jd = getDelegate()
3586: .selectTriggerJobDataMap(conn,
3587: tKey.getName(),
3588: tKey.getGroup());
3589: jd
3590: .put(
3591: Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME,
3592: tKey.getName());
3593: jd
3594: .put(
3595: Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP,
3596: tKey.getGroup());
3597: jd
3598: .put(
3599: Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS,
3600: String
3601: .valueOf(ftRec
3602: .getFireTimestamp()));
3603: rcvryTrig.setJobDataMap(jd);
3604:
3605: rcvryTrig.computeFirstFireTime(null);
3606: storeTrigger(conn, null, rcvryTrig,
3607: null, false, STATE_WAITING,
3608: false, true);
3609: recoveredCount++;
3610: } else {
3611: getLog()
3612: .warn(
3613: "ClusterManager: failed job '"
3614: + jKey
3615: + "' no longer exists, cannot schedule recovery.");
3616: otherCount++;
3617: }
3618: } else {
3619: otherCount++;
3620: }
3621:
3622: // free up stateful job's triggers
3623: if (ftRec.isJobIsStateful()) {
3624: getDelegate()
3625: .updateTriggerStatesForJobFromOtherState(
3626: conn, jKey.getName(),
3627: jKey.getGroup(),
3628: STATE_WAITING,
3629: STATE_BLOCKED);
3630: getDelegate()
3631: .updateTriggerStatesForJobFromOtherState(
3632: conn, jKey.getName(),
3633: jKey.getGroup(),
3634: STATE_PAUSED,
3635: STATE_PAUSED_BLOCKED);
3636: }
3637: }
3638:
3639: getDelegate().deleteFiredTriggers(conn,
3640: rec.getSchedulerInstanceId());
3641:
3642: // Check if any of the fired triggers we just deleted were the last fired trigger
3643: // records of a COMPLETE trigger.
3644: int completeCount = 0;
3645: for (Iterator triggerKeyIter = triggerKeys
3646: .iterator(); triggerKeyIter.hasNext();) {
3647: Key triggerKey = (Key) triggerKeyIter.next();
3648:
3649: if (getDelegate().selectTriggerState(conn,
3650: triggerKey.getName(),
3651: triggerKey.getGroup()).equals(
3652: STATE_COMPLETE)) {
3653: List firedTriggers = getDelegate()
3654: .selectFiredTriggerRecords(conn,
3655: triggerKey.getName(),
3656: triggerKey.getGroup());
3657: if (firedTriggers.isEmpty()) {
3658: SchedulingContext schedulingContext = new SchedulingContext();
3659: schedulingContext
3660: .setInstanceId(instanceId);
3661:
3662: if (removeTrigger(conn,
3663: schedulingContext, triggerKey
3664: .getName(), triggerKey
3665: .getGroup())) {
3666: completeCount++;
3667: }
3668: }
3669: }
3670: }
3671:
3672: logWarnIfNonZero(acquiredCount,
3673: "ClusterManager: ......Freed "
3674: + acquiredCount
3675: + " acquired trigger(s).");
3676: logWarnIfNonZero(completeCount,
3677: "ClusterManager: ......Deleted "
3678: + completeCount
3679: + " complete triggers(s).");
3680: logWarnIfNonZero(
3681: recoveredCount,
3682: "ClusterManager: ......Scheduled "
3683: + recoveredCount
3684: + " recoverable job(s) for recovery.");
3685: logWarnIfNonZero(otherCount,
3686: "ClusterManager: ......Cleaned-up "
3687: + otherCount
3688: + " other failed job(s).");
3689:
3690: if (rec.getSchedulerInstanceId().equals(
3691: getInstanceId()) == false) {
3692: getDelegate().deleteSchedulerState(conn,
3693: rec.getSchedulerInstanceId());
3694: }
3695: }
3696: } catch (Exception e) {
3697: throw new JobPersistenceException(
3698: "Failure recovering jobs: " + e.getMessage(), e);
3699: }
3700: }
3701: }
3702:
3703: protected void logWarnIfNonZero(int val, String warning) {
3704: if (val > 0) {
3705: getLog().info(warning);
3706: } else {
3707: getLog().debug(warning);
3708: }
3709: }
3710:
3711: /**
3712: * <p>
3713: * Cleanup the given database connection. This means restoring
3714: * any modified auto commit or transaction isolation connection
3715: * attributes, and then closing the underlying connection.
3716: * </p>
3717: *
3718: * <p>
3719: * This is separate from closeConnection() because the Spring
3720: * integration relies on being able to overload closeConnection() and
3721: * expects the same connection back that it originally returned
3722: * from the datasource.
3723: * </p>
3724: *
3725: * @see #closeConnection(Connection)
3726: */
3727: protected void cleanupConnection(Connection conn) {
3728: if (conn != null) {
3729: if (conn instanceof Proxy) {
3730: Proxy connProxy = (Proxy) conn;
3731:
3732: InvocationHandler invocationHandler = Proxy
3733: .getInvocationHandler(connProxy);
3734: if (invocationHandler instanceof AttributeRestoringConnectionInvocationHandler) {
3735: AttributeRestoringConnectionInvocationHandler connHandler = (AttributeRestoringConnectionInvocationHandler) invocationHandler;
3736:
3737: connHandler.restoreOriginalAtributes();
3738: closeConnection(connHandler.getWrappedConnection());
3739: return;
3740: }
3741: }
3742:
3743: // Wan't a Proxy, or was a Proxy, but wasn't ours.
3744: closeConnection(conn);
3745: }
3746: }
3747:
3748: /**
3749: * Closes the supplied <code>Connection</code>.
3750: * <p>
3751: * Ignores a <code>null Connection</code>.
3752: * Any exception thrown trying to close the <code>Connection</code> is
3753: * logged and ignored.
3754: * </p>
3755: *
3756: * @param conn The <code>Connection</code> to close (Optional).
3757: */
3758: protected void closeConnection(Connection conn) {
3759: if (conn != null) {
3760: try {
3761: conn.close();
3762: } catch (SQLException e) {
3763: getLog().error("Failed to close Connection", e);
3764: } catch (Throwable e) {
3765: getLog()
3766: .error(
3767: "Unexpected exception closing Connection."
3768: + " This is often due to a Connection being returned after or during shutdown.",
3769: e);
3770: }
3771: }
3772: }
3773:
3774: /**
3775: * Rollback the supplied connection.
3776: *
3777: * <p>
3778: * Logs any SQLException it gets trying to rollback, but will not propogate
3779: * the exception lest it mask the exception that caused the caller to
3780: * need to rollback in the first place.
3781: * </p>
3782: *
3783: * @param conn (Optional)
3784: */
3785: protected void rollbackConnection(Connection conn) {
3786: if (conn != null) {
3787: try {
3788: conn.rollback();
3789: } catch (SQLException e) {
3790: getLog().error(
3791: "Couldn't rollback jdbc connection. "
3792: + e.getMessage(), e);
3793: }
3794: }
3795: }
3796:
3797: /**
3798: * Commit the supplied connection
3799: *
3800: * @param conn (Optional)
3801: * @throws JobPersistenceException thrown if a SQLException occurs when the
3802: * connection is committed
3803: */
3804: protected void commitConnection(Connection conn)
3805: throws JobPersistenceException {
3806:
3807: if (conn != null) {
3808: try {
3809: conn.commit();
3810: } catch (SQLException e) {
3811: throw new JobPersistenceException(
3812: "Couldn't commit jdbc connection. "
3813: + e.getMessage(), e);
3814: }
3815: }
3816: }
3817:
3818: /**
3819: * Implement this interface to provide the code to execute within
3820: * the a transaction template. If no return value is required, execute
3821: * should just return null.
3822: *
3823: * @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback)
3824: * @see JobStoreSupport#executeInLock(String, TransactionCallback)
3825: * @see JobStoreSupport#executeWithoutLock(TransactionCallback)
3826: */
3827: protected interface TransactionCallback {
3828: Object execute(Connection conn) throws JobPersistenceException;
3829: }
3830:
3831: /**
3832: * Implement this interface to provide the code to execute within
3833: * the a transaction template that has no return value.
3834: *
3835: * @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback)
3836: */
3837: protected interface VoidTransactionCallback {
3838: void execute(Connection conn) throws JobPersistenceException;
3839: }
3840:
3841: /**
3842: * Execute the given callback in a transaction. Depending on the JobStore,
3843: * the surrounding transaction may be assumed to be already present
3844: * (managed).
3845: *
3846: * <p>
3847: * This method just forwards to executeInLock() with a null lockName.
3848: * </p>
3849: *
3850: * @see #executeInLock(String, TransactionCallback)
3851: */
3852: public Object executeWithoutLock(TransactionCallback txCallback)
3853: throws JobPersistenceException {
3854: return executeInLock(null, txCallback);
3855: }
3856:
3857: /**
3858: * Execute the given callback having aquired the given lock.
3859: * Depending on the JobStore, the surrounding transaction may be
3860: * assumed to be already present (managed). This version is just a
3861: * handy wrapper around executeInLock that doesn't require a return
3862: * value.
3863: *
3864: * @param lockName The name of the lock to aquire, for example
3865: * "TRIGGER_ACCESS". If null, then no lock is aquired, but the
3866: * lockCallback is still executed in a transaction.
3867: *
3868: * @see #executeInLock(String, TransactionCallback)
3869: */
3870: protected void executeInLock(final String lockName,
3871: final VoidTransactionCallback txCallback)
3872: throws JobPersistenceException {
3873: executeInLock(lockName, new TransactionCallback() {
3874: public Object execute(Connection conn)
3875: throws JobPersistenceException {
3876: txCallback.execute(conn);
3877: return null;
3878: }
3879: });
3880: }
3881:
3882: /**
3883: * Execute the given callback having aquired the given lock.
3884: * Depending on the JobStore, the surrounding transaction may be
3885: * assumed to be already present (managed).
3886: *
3887: * @param lockName The name of the lock to aquire, for example
3888: * "TRIGGER_ACCESS". If null, then no lock is aquired, but the
3889: * lockCallback is still executed in a transaction.
3890: */
3891: protected abstract Object executeInLock(String lockName,
3892: TransactionCallback txCallback)
3893: throws JobPersistenceException;
3894:
3895: /**
3896: * Execute the given callback having optionally aquired the given lock.
3897: * This uses the non-managed transaction connection. This version is just a
3898: * handy wrapper around executeInNonManagedTXLock that doesn't require a return
3899: * value.
3900: *
3901: * @param lockName The name of the lock to aquire, for example
3902: * "TRIGGER_ACCESS". If null, then no lock is aquired, but the
3903: * lockCallback is still executed in a non-managed transaction.
3904: *
3905: * @see #executeInNonManagedTXLock(String, TransactionCallback)
3906: */
3907: protected void executeInNonManagedTXLock(final String lockName,
3908: final VoidTransactionCallback txCallback)
3909: throws JobPersistenceException {
3910: executeInNonManagedTXLock(lockName, new TransactionCallback() {
3911: public Object execute(Connection conn)
3912: throws JobPersistenceException {
3913: txCallback.execute(conn);
3914: return null;
3915: }
3916: });
3917: }
3918:
3919: /**
3920: * Execute the given callback having optionally aquired the given lock.
3921: * This uses the non-managed transaction connection.
3922: *
3923: * @param lockName The name of the lock to aquire, for example
3924: * "TRIGGER_ACCESS". If null, then no lock is aquired, but the
3925: * lockCallback is still executed in a non-managed transaction.
3926: */
3927: protected Object executeInNonManagedTXLock(String lockName,
3928: TransactionCallback txCallback)
3929: throws JobPersistenceException {
3930: boolean transOwner = false;
3931: Connection conn = null;
3932: try {
3933: if (lockName != null) {
3934: // If we aren't using db locks, then delay getting DB connection
3935: // until after aquiring the lock since it isn't needed.
3936: if (getLockHandler().requiresConnection()) {
3937: conn = getNonManagedTXConnection();
3938: }
3939:
3940: transOwner = getLockHandler()
3941: .obtainLock(conn, lockName);
3942: }
3943:
3944: if (conn == null) {
3945: conn = getNonManagedTXConnection();
3946: }
3947:
3948: Object result = txCallback.execute(conn);
3949: commitConnection(conn);
3950: return result;
3951: } catch (JobPersistenceException e) {
3952: rollbackConnection(conn);
3953: throw e;
3954: } catch (RuntimeException e) {
3955: rollbackConnection(conn);
3956: throw new JobPersistenceException(
3957: "Unexpected runtime exception: " + e.getMessage(),
3958: e);
3959: } finally {
3960: try {
3961: releaseLock(conn, lockName, transOwner);
3962: } finally {
3963: cleanupConnection(conn);
3964: }
3965: }
3966: }
3967:
3968: /////////////////////////////////////////////////////////////////////////////
3969: //
3970: // ClusterManager Thread
3971: //
3972: /////////////////////////////////////////////////////////////////////////////
3973:
3974: class ClusterManager extends Thread {
3975:
3976: private boolean shutdown = false;
3977:
3978: private int numFails = 0;
3979:
3980: ClusterManager() {
3981: this .setPriority(Thread.NORM_PRIORITY + 2);
3982: this .setName("QuartzScheduler_" + instanceName + "-"
3983: + instanceId + "_ClusterManager");
3984: this .setDaemon(getMakeThreadsDaemons());
3985: }
3986:
3987: public void initialize() {
3988: this .manage();
3989: this .start();
3990: }
3991:
3992: public void shutdown() {
3993: shutdown = true;
3994: this .interrupt();
3995: }
3996:
3997: private boolean manage() {
3998: boolean res = false;
3999: try {
4000:
4001: res = doCheckin();
4002:
4003: numFails = 0;
4004: getLog().debug("ClusterManager: Check-in complete.");
4005: } catch (Exception e) {
4006: if (numFails % 4 == 0) {
4007: getLog().error(
4008: "ClusterManager: Error managing cluster: "
4009: + e.getMessage(), e);
4010: }
4011: numFails++;
4012: }
4013: return res;
4014: }
4015:
4016: public void run() {
4017: while (!shutdown) {
4018:
4019: if (!shutdown) {
4020: long timeToSleep = getClusterCheckinInterval();
4021: long transpiredTime = (System.currentTimeMillis() - lastCheckin);
4022: timeToSleep = timeToSleep - transpiredTime;
4023: if (timeToSleep <= 0) {
4024: timeToSleep = 100L;
4025: }
4026:
4027: if (numFails > 0) {
4028: timeToSleep = Math.max(getDbRetryInterval(),
4029: timeToSleep);
4030: }
4031:
4032: try {
4033: Thread.sleep(timeToSleep);
4034: } catch (Exception ignore) {
4035: }
4036: }
4037:
4038: if (!shutdown && this .manage()) {
4039: signalSchedulingChange();
4040: }
4041:
4042: }//while !shutdown
4043: }
4044: }
4045:
4046: /////////////////////////////////////////////////////////////////////////////
4047: //
4048: // MisfireHandler Thread
4049: //
4050: /////////////////////////////////////////////////////////////////////////////
4051:
4052: class MisfireHandler extends Thread {
4053:
4054: private boolean shutdown = false;
4055:
4056: private int numFails = 0;
4057:
4058: MisfireHandler() {
4059: this .setName("QuartzScheduler_" + instanceName + "-"
4060: + instanceId + "_MisfireHandler");
4061: this .setDaemon(getMakeThreadsDaemons());
4062: }
4063:
4064: public void initialize() {
4065: //this.manage();
4066: this .start();
4067: }
4068:
4069: public void shutdown() {
4070: shutdown = true;
4071: this .interrupt();
4072: }
4073:
4074: private RecoverMisfiredJobsResult manage() {
4075: try {
4076: getLog().debug(
4077: "MisfireHandler: scanning for misfires...");
4078:
4079: RecoverMisfiredJobsResult res = doRecoverMisfires();
4080: numFails = 0;
4081: return res;
4082: } catch (Exception e) {
4083: if (numFails % 4 == 0) {
4084: getLog().error(
4085: "MisfireHandler: Error handling misfires: "
4086: + e.getMessage(), e);
4087: }
4088: numFails++;
4089: }
4090: return RecoverMisfiredJobsResult.NO_OP;
4091: }
4092:
4093: public void run() {
4094:
4095: while (!shutdown) {
4096:
4097: long sTime = System.currentTimeMillis();
4098:
4099: RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
4100:
4101: if (recoverMisfiredJobsResult
4102: .getProcessedMisfiredTriggerCount() > 0) {
4103: signalSchedulingChange();
4104: }
4105:
4106: if (!shutdown) {
4107: long timeToSleep = 50l; // At least a short pause to help balance threads
4108: if (!recoverMisfiredJobsResult
4109: .hasMoreMisfiredTriggers()) {
4110: timeToSleep = getMisfireThreshold()
4111: - (System.currentTimeMillis() - sTime);
4112: if (timeToSleep <= 0) {
4113: timeToSleep = 50l;
4114: }
4115:
4116: if (numFails > 0) {
4117: timeToSleep = Math.max(
4118: getDbRetryInterval(), timeToSleep);
4119: }
4120: }
4121:
4122: try {
4123: Thread.sleep(timeToSleep);
4124: } catch (Exception ignore) {
4125: }
4126: }//while !shutdown
4127: }
4128: }
4129: }
4130: }
4131:
4132: // EOF
|