Source Code Cross Referenced for JobStoreSupport.java in  » Project-Management » quartz » org » quartz » impl » jdbcjobstore » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Project Management » quartz » org.quartz.impl.jdbcjobstore 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /* 
0002:         * Copyright 2004-2005 OpenSymphony 
0003:         * 
0004:         * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
0005:         * use this file except in compliance with the License. You may obtain a copy 
0006:         * of the License at 
0007:         * 
0008:         *   http://www.apache.org/licenses/LICENSE-2.0 
0009:         *   
0010:         * Unless required by applicable law or agreed to in writing, software 
0011:         * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
0012:         * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
0013:         * License for the specific language governing permissions and limitations 
0014:         * under the License.
0015:         * 
0016:         */
0017:
0018:        /*
0019:         * Previously Copyright (c) 2001-2004 James House
0020:         */
0021:        package org.quartz.impl.jdbcjobstore;
0022:
0023:        import java.io.IOException;
0024:        import java.lang.reflect.Constructor;
0025:        import java.lang.reflect.InvocationHandler;
0026:        import java.lang.reflect.InvocationTargetException;
0027:        import java.lang.reflect.Proxy;
0028:        import java.sql.Connection;
0029:        import java.sql.SQLException;
0030:        import java.util.ArrayList;
0031:        import java.util.Date;
0032:        import java.util.HashMap;
0033:        import java.util.HashSet;
0034:        import java.util.Iterator;
0035:        import java.util.LinkedList;
0036:        import java.util.List;
0037:        import java.util.Set;
0038:
0039:        import org.apache.commons.logging.Log;
0040:        import org.apache.commons.logging.LogFactory;
0041:        import org.quartz.Calendar;
0042:        import org.quartz.CronTrigger;
0043:        import org.quartz.JobDataMap;
0044:        import org.quartz.JobDetail;
0045:        import org.quartz.JobPersistenceException;
0046:        import org.quartz.ObjectAlreadyExistsException;
0047:        import org.quartz.Scheduler;
0048:        import org.quartz.SchedulerConfigException;
0049:        import org.quartz.SchedulerException;
0050:        import org.quartz.SimpleTrigger;
0051:        import org.quartz.Trigger;
0052:        import org.quartz.core.SchedulingContext;
0053:        import org.quartz.spi.ClassLoadHelper;
0054:        import org.quartz.spi.JobStore;
0055:        import org.quartz.spi.SchedulerSignaler;
0056:        import org.quartz.spi.TriggerFiredBundle;
0057:        import org.quartz.utils.DBConnectionManager;
0058:        import org.quartz.utils.Key;
0059:        import org.quartz.utils.TriggerStatus;
0060:
0061:        /**
0062:         * <p>
0063:         * Contains base functionality for JDBC-based JobStore implementations.
0064:         * </p>
0065:         * 
0066:         * @author <a href="mailto:jeff@binaryfeed.org">Jeffrey Wescott</a>
0067:         * @author James House
0068:         */
0069:        public abstract class JobStoreSupport implements  JobStore, Constants {
0070:
0071:            /*
0072:             * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0073:             * 
0074:             * Constants.
0075:             * 
0076:             * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0077:             */
0078:
0079:            protected static String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS";
0080:
0081:            protected static String LOCK_JOB_ACCESS = "JOB_ACCESS";
0082:
0083:            protected static String LOCK_CALENDAR_ACCESS = "CALENDAR_ACCESS";
0084:
0085:            protected static String LOCK_STATE_ACCESS = "STATE_ACCESS";
0086:
0087:            protected static String LOCK_MISFIRE_ACCESS = "MISFIRE_ACCESS";
0088:
0089:            /*
0090:             * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0091:             * 
0092:             * Data members.
0093:             * 
0094:             * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0095:             */
0096:
0097:            protected String dsName;
0098:
0099:            protected String tablePrefix = DEFAULT_TABLE_PREFIX;
0100:
0101:            protected boolean useProperties = false;
0102:
0103:            protected String instanceId;
0104:
0105:            protected String instanceName;
0106:
0107:            protected String delegateClassName;
0108:            protected Class delegateClass = StdJDBCDelegate.class;
0109:
0110:            protected HashMap calendarCache = new HashMap();
0111:
0112:            private DriverDelegate delegate;
0113:
0114:            private long misfireThreshold = 60000L; // one minute
0115:
0116:            private boolean dontSetAutoCommitFalse = false;
0117:
0118:            private boolean isClustered = false;
0119:
0120:            private boolean useDBLocks = false;
0121:
0122:            private boolean lockOnInsert = true;
0123:
0124:            private Semaphore lockHandler = null; // set in initialize() method...
0125:
0126:            private String selectWithLockSQL = null;
0127:
0128:            private long clusterCheckinInterval = 7500L;
0129:
0130:            private ClusterManager clusterManagementThread = null;
0131:
0132:            private MisfireHandler misfireHandler = null;
0133:
0134:            private ClassLoadHelper classLoadHelper;
0135:
0136:            private SchedulerSignaler signaler;
0137:
0138:            protected int maxToRecoverAtATime = 20;
0139:
0140:            private boolean setTxIsolationLevelSequential = false;
0141:
0142:            private long dbRetryInterval = 10000;
0143:
0144:            private boolean makeThreadsDaemons = false;
0145:
0146:            private boolean doubleCheckLockMisfireHandler = true;
0147:
0148:            private final Log log = LogFactory.getLog(getClass());
0149:
0150:            /*
0151:             * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0152:             * 
0153:             * Interface.
0154:             * 
0155:             * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0156:             */
0157:
0158:            /**
0159:             * <p>
0160:             * Set the name of the <code>DataSource</code> that should be used for
0161:             * performing database functions.
0162:             * </p>
0163:             */
0164:            public void setDataSource(String dsName) {
0165:                this .dsName = dsName;
0166:            }
0167:
0168:            /**
0169:             * <p>
0170:             * Get the name of the <code>DataSource</code> that should be used for
0171:             * performing database functions.
0172:             * </p>
0173:             */
0174:            public String getDataSource() {
0175:                return dsName;
0176:            }
0177:
0178:            /**
0179:             * <p>
0180:             * Set the prefix that should be pre-pended to all table names.
0181:             * </p>
0182:             */
0183:            public void setTablePrefix(String prefix) {
0184:                if (prefix == null) {
0185:                    prefix = "";
0186:                }
0187:
0188:                this .tablePrefix = prefix;
0189:            }
0190:
0191:            /**
0192:             * <p>
0193:             * Get the prefix that should be pre-pended to all table names.
0194:             * </p>
0195:             */
0196:            public String getTablePrefix() {
0197:                return tablePrefix;
0198:            }
0199:
0200:            /**
0201:             * <p>
0202:             * Set whether String-only properties will be handled in JobDataMaps.
0203:             * </p>
0204:             */
0205:            public void setUseProperties(String useProp) {
0206:                if (useProp == null) {
0207:                    useProp = "false";
0208:                }
0209:
0210:                this .useProperties = Boolean.valueOf(useProp).booleanValue();
0211:            }
0212:
0213:            /**
0214:             * <p>
0215:             * Get whether String-only properties will be handled in JobDataMaps.
0216:             * </p>
0217:             */
0218:            public boolean canUseProperties() {
0219:                return useProperties;
0220:            }
0221:
0222:            /**
0223:             * <p>
0224:             * Set the instance Id of the Scheduler (must be unique within a cluster).
0225:             * </p>
0226:             */
0227:            public void setInstanceId(String instanceId) {
0228:                this .instanceId = instanceId;
0229:            }
0230:
0231:            /**
0232:             * <p>
0233:             * Get the instance Id of the Scheduler (must be unique within a cluster).
0234:             * </p>
0235:             */
0236:            public String getInstanceId() {
0237:
0238:                return instanceId;
0239:            }
0240:
0241:            /**
0242:             * Set the instance name of the Scheduler (must be unique within this server instance).
0243:             */
0244:            public void setInstanceName(String instanceName) {
0245:                this .instanceName = instanceName;
0246:            }
0247:
0248:            /**
0249:             * Get the instance name of the Scheduler (must be unique within this server instance).
0250:             */
0251:            public String getInstanceName() {
0252:
0253:                return instanceName;
0254:            }
0255:
0256:            /**
0257:             * <p>
0258:             * Set whether this instance is part of a cluster.
0259:             * </p>
0260:             */
0261:            public void setIsClustered(boolean isClustered) {
0262:                this .isClustered = isClustered;
0263:            }
0264:
0265:            /**
0266:             * <p>
0267:             * Get whether this instance is part of a cluster.
0268:             * </p>
0269:             */
0270:            public boolean isClustered() {
0271:                return isClustered;
0272:            }
0273:
0274:            /**
0275:             * <p>
0276:             * Get the frequency (in milliseconds) at which this instance "checks-in"
0277:             * with the other instances of the cluster. -- Affects the rate of
0278:             * detecting failed instances.
0279:             * </p>
0280:             */
0281:            public long getClusterCheckinInterval() {
0282:                return clusterCheckinInterval;
0283:            }
0284:
0285:            /**
0286:             * <p>
0287:             * Set the frequency (in milliseconds) at which this instance "checks-in"
0288:             * with the other instances of the cluster. -- Affects the rate of
0289:             * detecting failed instances.
0290:             * </p>
0291:             */
0292:            public void setClusterCheckinInterval(long l) {
0293:                clusterCheckinInterval = l;
0294:            }
0295:
0296:            /**
0297:             * <p>
0298:             * Get the maximum number of misfired triggers that the misfire handling
0299:             * thread will try to recover at one time (within one transaction).  The
0300:             * default is 20.
0301:             * </p>
0302:             */
0303:            public int getMaxMisfiresToHandleAtATime() {
0304:                return maxToRecoverAtATime;
0305:            }
0306:
0307:            /**
0308:             * <p>
0309:             * Set the maximum number of misfired triggers that the misfire handling
0310:             * thread will try to recover at one time (within one transaction).  The
0311:             * default is 20.
0312:             * </p>
0313:             */
0314:            public void setMaxMisfiresToHandleAtATime(int maxToRecoverAtATime) {
0315:                this .maxToRecoverAtATime = maxToRecoverAtATime;
0316:            }
0317:
0318:            /**
0319:             * @return Returns the dbRetryInterval.
0320:             */
0321:            public long getDbRetryInterval() {
0322:                return dbRetryInterval;
0323:            }
0324:
0325:            /**
0326:             * @param dbRetryInterval The dbRetryInterval to set.
0327:             */
0328:            public void setDbRetryInterval(long dbRetryInterval) {
0329:                this .dbRetryInterval = dbRetryInterval;
0330:            }
0331:
0332:            /**
0333:             * <p>
0334:             * Set whether this instance should use database-based thread
0335:             * synchronization.
0336:             * </p>
0337:             */
0338:            public void setUseDBLocks(boolean useDBLocks) {
0339:                this .useDBLocks = useDBLocks;
0340:            }
0341:
0342:            /**
0343:             * <p>
0344:             * Get whether this instance should use database-based thread
0345:             * synchronization.
0346:             * </p>
0347:             */
0348:            public boolean getUseDBLocks() {
0349:                return useDBLocks;
0350:            }
0351:
0352:            public boolean isLockOnInsert() {
0353:                return lockOnInsert;
0354:            }
0355:
0356:            /**
0357:             * Whether or not to obtain locks when inserting new jobs/triggers.  
0358:             * Defaults to <code>true</code>, which is safest - some db's (such as 
0359:             * MS SQLServer) seem to require this to avoid deadlocks under high load,
0360:             * while others seem to do fine without.  
0361:             * 
0362:             * <p>Setting this property to <code>false</code> will provide a 
0363:             * significant performance increase during the addition of new jobs 
0364:             * and triggers.</p>
0365:             * 
0366:             * @param lockOnInsert
0367:             */
0368:            public void setLockOnInsert(boolean lockOnInsert) {
0369:                this .lockOnInsert = lockOnInsert;
0370:            }
0371:
0372:            public long getMisfireThreshold() {
0373:                return misfireThreshold;
0374:            }
0375:
0376:            /**
0377:             * The the number of milliseconds by which a trigger must have missed its
0378:             * next-fire-time, in order for it to be considered "misfired" and thus
0379:             * have its misfire instruction applied.
0380:             * 
0381:             * @param misfireThreshold
0382:             */
0383:            public void setMisfireThreshold(long misfireThreshold) {
0384:                if (misfireThreshold < 1) {
0385:                    throw new IllegalArgumentException(
0386:                            "Misfirethreshold must be larger than 0");
0387:                }
0388:                this .misfireThreshold = misfireThreshold;
0389:            }
0390:
0391:            public boolean isDontSetAutoCommitFalse() {
0392:                return dontSetAutoCommitFalse;
0393:            }
0394:
0395:            /**
0396:             * Don't call set autocommit(false) on connections obtained from the
0397:             * DataSource. This can be helpfull in a few situations, such as if you
0398:             * have a driver that complains if it is called when it is already off.
0399:             * 
0400:             * @param b
0401:             */
0402:            public void setDontSetAutoCommitFalse(boolean b) {
0403:                dontSetAutoCommitFalse = b;
0404:            }
0405:
0406:            public boolean isTxIsolationLevelSerializable() {
0407:                return setTxIsolationLevelSequential;
0408:            }
0409:
0410:            /**
0411:             * Set the transaction isolation level of DB connections to sequential.
0412:             * 
0413:             * @param b
0414:             */
0415:            public void setTxIsolationLevelSerializable(boolean b) {
0416:                setTxIsolationLevelSequential = b;
0417:            }
0418:
0419:            /**
0420:             * <p>
0421:             * Set the JDBC driver delegate class.
0422:             * </p>
0423:             * 
0424:             * @param delegateClassName
0425:             *          the delegate class name
0426:             */
0427:            public void setDriverDelegateClass(String delegateClassName)
0428:                    throws InvalidConfigurationException {
0429:                this .delegateClassName = delegateClassName;
0430:            }
0431:
0432:            /**
0433:             * <p>
0434:             * Get the JDBC driver delegate class name.
0435:             * </p>
0436:             * 
0437:             * @return the delegate class name
0438:             */
0439:            public String getDriverDelegateClass() {
0440:                return delegateClassName;
0441:            }
0442:
0443:            public String getSelectWithLockSQL() {
0444:                return selectWithLockSQL;
0445:            }
0446:
0447:            /**
0448:             * <p>
0449:             * set the SQL statement to use to select and lock a row in the "locks"
0450:             * table.
0451:             * </p>
0452:             * 
0453:             * @see StdRowLockSemaphore
0454:             */
0455:            public void setSelectWithLockSQL(String string) {
0456:                selectWithLockSQL = string;
0457:            }
0458:
0459:            protected ClassLoadHelper getClassLoadHelper() {
0460:                return classLoadHelper;
0461:            }
0462:
0463:            /**
0464:             * Get whether the threads spawned by this JobStore should be
0465:             * marked as daemon.  Possible threads include the <code>MisfireHandler</code> 
0466:             * and the <code>ClusterManager</code>.
0467:             * 
0468:             * @see Thread#setDaemon(boolean)
0469:             */
0470:            public boolean getMakeThreadsDaemons() {
0471:                return makeThreadsDaemons;
0472:            }
0473:
0474:            /**
0475:             * Set whether the threads spawned by this JobStore should be
0476:             * marked as daemon.  Possible threads include the <code>MisfireHandler</code> 
0477:             * and the <code>ClusterManager</code>.
0478:             *
0479:             * @see Thread#setDaemon(boolean)
0480:             */
0481:            public void setMakeThreadsDaemons(boolean makeThreadsDaemons) {
0482:                this .makeThreadsDaemons = makeThreadsDaemons;
0483:            }
0484:
0485:            /**
0486:             * Get whether to check to see if there are Triggers that have misfired
0487:             * before actually acquiring the lock to recover them.  This should be 
0488:             * set to false if the majority of the time, there are are misfired
0489:             * Triggers.
0490:             */
0491:            public boolean getDoubleCheckLockMisfireHandler() {
0492:                return doubleCheckLockMisfireHandler;
0493:            }
0494:
0495:            /**
0496:             * Set whether to check to see if there are Triggers that have misfired
0497:             * before actually acquiring the lock to recover them.  This should be 
0498:             * set to false if the majority of the time, there are are misfired
0499:             * Triggers.
0500:             */
0501:            public void setDoubleCheckLockMisfireHandler(
0502:                    boolean doubleCheckLockMisfireHandler) {
0503:                this .doubleCheckLockMisfireHandler = doubleCheckLockMisfireHandler;
0504:            }
0505:
0506:            //---------------------------------------------------------------------------
0507:            // interface methods
0508:            //---------------------------------------------------------------------------
0509:
0510:            protected Log getLog() {
0511:                return log;
0512:            }
0513:
0514:            /**
0515:             * <p>
0516:             * Called by the QuartzScheduler before the <code>JobStore</code> is
0517:             * used, in order to give it a chance to initialize.
0518:             * </p>
0519:             */
0520:            public void initialize(ClassLoadHelper loadHelper,
0521:                    SchedulerSignaler signaler) throws SchedulerConfigException {
0522:
0523:                if (dsName == null) {
0524:                    throw new SchedulerConfigException(
0525:                            "DataSource name not set.");
0526:                }
0527:
0528:                classLoadHelper = loadHelper;
0529:                this .signaler = signaler;
0530:
0531:                // If the user hasn't specified an explicit lock handler, then 
0532:                // choose one based on CMT/Clustered/UseDBLocks.
0533:                if (getLockHandler() == null) {
0534:
0535:                    // If the user hasn't specified an explicit lock handler, 
0536:                    // then we *must* use DB locks with clustering
0537:                    if (isClustered()) {
0538:                        setUseDBLocks(true);
0539:                    }
0540:
0541:                    if (getUseDBLocks()) {
0542:                        getLog()
0543:                                .info(
0544:                                        "Using db table-based data access locking (synchronization).");
0545:                        setLockHandler(new StdRowLockSemaphore(
0546:                                getTablePrefix(), getSelectWithLockSQL()));
0547:                    } else {
0548:                        getLog()
0549:                                .info(
0550:                                        "Using thread monitor-based data access locking (synchronization).");
0551:                        setLockHandler(new SimpleSemaphore());
0552:                    }
0553:                }
0554:
0555:                if (!isClustered()) {
0556:                    try {
0557:                        cleanVolatileTriggerAndJobs();
0558:                    } catch (SchedulerException se) {
0559:                        throw new SchedulerConfigException(
0560:                                "Failure occured during job recovery.", se);
0561:                    }
0562:                }
0563:            }
0564:
0565:            /**
0566:             * @see org.quartz.spi.JobStore#schedulerStarted()
0567:             */
0568:            public void schedulerStarted() throws SchedulerException {
0569:
0570:                if (isClustered()) {
0571:                    clusterManagementThread = new ClusterManager();
0572:                    clusterManagementThread.initialize();
0573:                } else {
0574:                    try {
0575:                        recoverJobs();
0576:                    } catch (SchedulerException se) {
0577:                        throw new SchedulerConfigException(
0578:                                "Failure occured during job recovery.", se);
0579:                    }
0580:                }
0581:
0582:                misfireHandler = new MisfireHandler();
0583:                misfireHandler.initialize();
0584:            }
0585:
0586:            /**
0587:             * <p>
0588:             * Called by the QuartzScheduler to inform the <code>JobStore</code> that
0589:             * it should free up all of it's resources because the scheduler is
0590:             * shutting down.
0591:             * </p>
0592:             */
0593:            public void shutdown() {
0594:                if (clusterManagementThread != null) {
0595:                    clusterManagementThread.shutdown();
0596:                }
0597:
0598:                if (misfireHandler != null) {
0599:                    misfireHandler.shutdown();
0600:                }
0601:
0602:                try {
0603:                    DBConnectionManager.getInstance().shutdown(getDataSource());
0604:                } catch (SQLException sqle) {
0605:                    getLog().warn("Database connection shutdown unsuccessful.",
0606:                            sqle);
0607:                }
0608:            }
0609:
0610:            public boolean supportsPersistence() {
0611:                return true;
0612:            }
0613:
0614:            //---------------------------------------------------------------------------
0615:            // helper methods for subclasses
0616:            //---------------------------------------------------------------------------
0617:
0618:            protected abstract Connection getNonManagedTXConnection()
0619:                    throws JobPersistenceException;
0620:
0621:            /**
0622:             * Wrap the given <code>Connection</code> in a Proxy such that attributes 
0623:             * that might be set will be restored before the connection is closed 
0624:             * (and potentially restored to a pool).
0625:             */
0626:            protected Connection getAttributeRestoringConnection(Connection conn) {
0627:                return (Connection) Proxy
0628:                        .newProxyInstance(
0629:                                Thread.currentThread().getContextClassLoader(),
0630:                                new Class[] { Connection.class },
0631:                                new AttributeRestoringConnectionInvocationHandler(
0632:                                        conn));
0633:            }
0634:
0635:            protected Connection getConnection() throws JobPersistenceException {
0636:                Connection conn = null;
0637:                try {
0638:                    conn = DBConnectionManager.getInstance().getConnection(
0639:                            getDataSource());
0640:                } catch (SQLException sqle) {
0641:                    throw new JobPersistenceException(
0642:                            "Failed to obtain DB connection from data source '"
0643:                                    + getDataSource() + "': " + sqle.toString(),
0644:                            sqle);
0645:                } catch (Throwable e) {
0646:                    throw new JobPersistenceException(
0647:                            "Failed to obtain DB connection from data source '"
0648:                                    + getDataSource() + "': " + e.toString(),
0649:                            e,
0650:                            JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE);
0651:                }
0652:
0653:                if (conn == null) {
0654:                    throw new JobPersistenceException(
0655:                            "Could not get connection from DataSource '"
0656:                                    + getDataSource() + "'");
0657:                }
0658:
0659:                // Protect connection attributes we might change.
0660:                conn = getAttributeRestoringConnection(conn);
0661:
0662:                // Set any connection connection attributes we are to override.
0663:                try {
0664:                    if (!isDontSetAutoCommitFalse()) {
0665:                        conn.setAutoCommit(false);
0666:                    }
0667:
0668:                    if (isTxIsolationLevelSerializable()) {
0669:                        conn
0670:                                .setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
0671:                    }
0672:                } catch (SQLException sqle) {
0673:                    getLog()
0674:                            .warn(
0675:                                    "Failed to override connection auto commit/transaction isolation.",
0676:                                    sqle);
0677:                } catch (Throwable e) {
0678:                    try {
0679:                        conn.close();
0680:                    } catch (Throwable tt) {
0681:                    }
0682:
0683:                    throw new JobPersistenceException(
0684:                            "Failure setting up connection.", e);
0685:                }
0686:
0687:                return conn;
0688:            }
0689:
0690:            protected void releaseLock(Connection conn, String lockName,
0691:                    boolean doIt) {
0692:                if (doIt && conn != null) {
0693:                    try {
0694:                        getLockHandler().releaseLock(conn, lockName);
0695:                    } catch (LockException le) {
0696:                        getLog().error(
0697:                                "Error returning lock: " + le.getMessage(), le);
0698:                    }
0699:                }
0700:            }
0701:
0702:            /**
0703:             * Removes all volatile data.
0704:             * 
0705:             * @throws JobPersistenceException If jobs could not be recovered.
0706:             */
0707:            protected void cleanVolatileTriggerAndJobs()
0708:                    throws JobPersistenceException {
0709:                executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
0710:                        new VoidTransactionCallback() {
0711:                            public void execute(Connection conn)
0712:                                    throws JobPersistenceException {
0713:                                cleanVolatileTriggerAndJobs(conn);
0714:                            }
0715:                        });
0716:            }
0717:
0718:            /**
0719:             * <p>
0720:             * Removes all volatile data.
0721:             * </p>
0722:             * 
0723:             * @throws JobPersistenceException
0724:             *           if jobs could not be recovered
0725:             */
0726:            protected void cleanVolatileTriggerAndJobs(Connection conn)
0727:                    throws JobPersistenceException {
0728:                try {
0729:                    // find volatile jobs & triggers...
0730:                    Key[] volatileTriggers = getDelegate()
0731:                            .selectVolatileTriggers(conn);
0732:                    Key[] volatileJobs = getDelegate().selectVolatileJobs(conn);
0733:
0734:                    for (int i = 0; i < volatileTriggers.length; i++) {
0735:                        removeTrigger(conn, null,
0736:                                volatileTriggers[i].getName(),
0737:                                volatileTriggers[i].getGroup());
0738:                    }
0739:                    getLog().info(
0740:                            "Removed " + volatileTriggers.length
0741:                                    + " Volatile Trigger(s).");
0742:
0743:                    for (int i = 0; i < volatileJobs.length; i++) {
0744:                        removeJob(conn, null, volatileJobs[i].getName(),
0745:                                volatileJobs[i].getGroup(), true);
0746:                    }
0747:                    getLog().info(
0748:                            "Removed " + volatileJobs.length
0749:                                    + " Volatile Job(s).");
0750:
0751:                    // clean up any fired trigger entries
0752:                    getDelegate().deleteVolatileFiredTriggers(conn);
0753:
0754:                } catch (Exception e) {
0755:                    throw new JobPersistenceException(
0756:                            "Couldn't clean volatile data: " + e.getMessage(),
0757:                            e);
0758:                }
0759:            }
0760:
0761:            /**
0762:             * Recover any failed or misfired jobs and clean up the data store as
0763:             * appropriate.
0764:             * 
0765:             * @throws JobPersistenceException if jobs could not be recovered
0766:             */
0767:            protected void recoverJobs() throws JobPersistenceException {
0768:                executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
0769:                        new VoidTransactionCallback() {
0770:                            public void execute(Connection conn)
0771:                                    throws JobPersistenceException {
0772:                                recoverJobs(conn);
0773:                            }
0774:                        });
0775:            }
0776:
0777:            /**
0778:             * <p>
0779:             * Will recover any failed or misfired jobs and clean up the data store as
0780:             * appropriate.
0781:             * </p>
0782:             * 
0783:             * @throws JobPersistenceException
0784:             *           if jobs could not be recovered
0785:             */
0786:            protected void recoverJobs(Connection conn)
0787:                    throws JobPersistenceException {
0788:                try {
0789:                    // update inconsistent job states
0790:                    int rows = getDelegate()
0791:                            .updateTriggerStatesFromOtherStates(conn,
0792:                                    STATE_WAITING, STATE_ACQUIRED,
0793:                                    STATE_BLOCKED);
0794:
0795:                    rows += getDelegate().updateTriggerStatesFromOtherStates(
0796:                            conn, STATE_PAUSED, STATE_PAUSED_BLOCKED,
0797:                            STATE_PAUSED_BLOCKED);
0798:
0799:                    getLog()
0800:                            .info(
0801:                                    "Freed "
0802:                                            + rows
0803:                                            + " triggers from 'acquired' / 'blocked' state.");
0804:
0805:                    // clean up misfired jobs
0806:                    recoverMisfiredJobs(conn, true);
0807:
0808:                    // recover jobs marked for recovery that were not fully executed
0809:                    Trigger[] recoveringJobTriggers = getDelegate()
0810:                            .selectTriggersForRecoveringJobs(conn);
0811:                    getLog()
0812:                            .info(
0813:                                    "Recovering "
0814:                                            + recoveringJobTriggers.length
0815:                                            + " jobs that were in-progress at the time of the last shut-down.");
0816:
0817:                    for (int i = 0; i < recoveringJobTriggers.length; ++i) {
0818:                        if (jobExists(conn, recoveringJobTriggers[i]
0819:                                .getJobName(), recoveringJobTriggers[i]
0820:                                .getJobGroup())) {
0821:                            recoveringJobTriggers[i].computeFirstFireTime(null);
0822:                            storeTrigger(conn, null, recoveringJobTriggers[i],
0823:                                    null, false, STATE_WAITING, false, true);
0824:                        }
0825:                    }
0826:                    getLog().info("Recovery complete.");
0827:
0828:                    // remove lingering 'complete' triggers...
0829:                    Key[] ct = getDelegate().selectTriggersInState(conn,
0830:                            STATE_COMPLETE);
0831:                    for (int i = 0; ct != null && i < ct.length; i++) {
0832:                        removeTrigger(conn, null, ct[i].getName(), ct[i]
0833:                                .getGroup());
0834:                    }
0835:                    getLog().info(
0836:                            "Removed " + ct.length + " 'complete' triggers.");
0837:
0838:                    // clean up any fired trigger entries
0839:                    int n = getDelegate().deleteFiredTriggers(conn);
0840:                    getLog().info("Removed " + n + " stale fired job entries.");
0841:                } catch (JobPersistenceException e) {
0842:                    throw e;
0843:                } catch (Exception e) {
0844:                    throw new JobPersistenceException("Couldn't recover jobs: "
0845:                            + e.getMessage(), e);
0846:                }
0847:            }
0848:
0849:            protected long getMisfireTime() {
0850:                long misfireTime = System.currentTimeMillis();
0851:                if (getMisfireThreshold() > 0) {
0852:                    misfireTime -= getMisfireThreshold();
0853:                }
0854:
0855:                return (misfireTime > 0) ? misfireTime : 0;
0856:            }
0857:
0858:            /**
0859:             * Helper class for returning the composite result of trying
0860:             * to recover misfired jobs.
0861:             */
0862:            protected static class RecoverMisfiredJobsResult {
0863:                public static final RecoverMisfiredJobsResult NO_OP = new RecoverMisfiredJobsResult(
0864:                        false, 0);
0865:
0866:                private boolean _hasMoreMisfiredTriggers;
0867:                private int _processedMisfiredTriggerCount;
0868:
0869:                public RecoverMisfiredJobsResult(
0870:                        boolean hasMoreMisfiredTriggers,
0871:                        int processedMisfiredTriggerCount) {
0872:                    _hasMoreMisfiredTriggers = hasMoreMisfiredTriggers;
0873:                    _processedMisfiredTriggerCount = processedMisfiredTriggerCount;
0874:                }
0875:
0876:                public boolean hasMoreMisfiredTriggers() {
0877:                    return _hasMoreMisfiredTriggers;
0878:                }
0879:
0880:                public int getProcessedMisfiredTriggerCount() {
0881:                    return _processedMisfiredTriggerCount;
0882:                }
0883:            }
0884:
0885:            protected RecoverMisfiredJobsResult recoverMisfiredJobs(
0886:                    Connection conn, boolean recovering)
0887:                    throws JobPersistenceException, SQLException {
0888:
0889:                // If recovering, we want to handle all of the misfired
0890:                // triggers right away.
0891:                int maxMisfiresToHandleAtATime = (recovering) ? -1
0892:                        : getMaxMisfiresToHandleAtATime();
0893:
0894:                List misfiredTriggers = new ArrayList();
0895:
0896:                // We must still look for the MISFIRED state in case triggers were left 
0897:                // in this state when upgrading to this version that does not support it. 
0898:                boolean hasMoreMisfiredTriggers = getDelegate()
0899:                        .selectMisfiredTriggersInStates(conn, STATE_MISFIRED,
0900:                                STATE_WAITING, getMisfireTime(),
0901:                                maxMisfiresToHandleAtATime, misfiredTriggers);
0902:
0903:                if (hasMoreMisfiredTriggers) {
0904:                    getLog()
0905:                            .info(
0906:                                    "Handling the first "
0907:                                            + misfiredTriggers.size()
0908:                                            + " triggers that missed their scheduled fire-time.  "
0909:                                            + "More misfired triggers remain to be processed.");
0910:                } else if (misfiredTriggers.size() > 0) {
0911:                    getLog()
0912:                            .info(
0913:                                    "Handling "
0914:                                            + misfiredTriggers.size()
0915:                                            + " trigger(s) that missed their scheduled fire-time.");
0916:                } else {
0917:                    getLog()
0918:                            .debug(
0919:                                    "Found 0 triggers that missed their scheduled fire-time.");
0920:                    return RecoverMisfiredJobsResult.NO_OP;
0921:                }
0922:
0923:                for (Iterator misfiredTriggerIter = misfiredTriggers.iterator(); misfiredTriggerIter
0924:                        .hasNext();) {
0925:                    Key triggerKey = (Key) misfiredTriggerIter.next();
0926:
0927:                    Trigger trig = retrieveTrigger(conn, triggerKey.getName(),
0928:                            triggerKey.getGroup());
0929:
0930:                    if (trig == null) {
0931:                        continue;
0932:                    }
0933:
0934:                    doUpdateOfMisfiredTrigger(conn, null, trig, false,
0935:                            STATE_WAITING, recovering);
0936:                }
0937:
0938:                return new RecoverMisfiredJobsResult(hasMoreMisfiredTriggers,
0939:                        misfiredTriggers.size());
0940:            }
0941:
0942:            protected boolean updateMisfiredTrigger(Connection conn,
0943:                    SchedulingContext ctxt, String triggerName,
0944:                    String groupName, String newStateIfNotComplete,
0945:                    boolean forceState) // TODO: probably
0946:                    // get rid of
0947:                    // this
0948:                    throws JobPersistenceException {
0949:                try {
0950:
0951:                    Trigger trig = getDelegate().selectTrigger(conn,
0952:                            triggerName, groupName);
0953:
0954:                    long misfireTime = System.currentTimeMillis();
0955:                    if (getMisfireThreshold() > 0) {
0956:                        misfireTime -= getMisfireThreshold();
0957:                    }
0958:
0959:                    if (trig.getNextFireTime().getTime() > misfireTime) {
0960:                        return false;
0961:                    }
0962:
0963:                    doUpdateOfMisfiredTrigger(conn, ctxt, trig, forceState,
0964:                            newStateIfNotComplete, false);
0965:
0966:                    return true;
0967:
0968:                } catch (Exception e) {
0969:                    throw new JobPersistenceException(
0970:                            "Couldn't update misfired trigger '" + groupName
0971:                                    + "." + triggerName + "': "
0972:                                    + e.getMessage(), e);
0973:                }
0974:            }
0975:
0976:            private void doUpdateOfMisfiredTrigger(Connection conn,
0977:                    SchedulingContext ctxt, Trigger trig, boolean forceState,
0978:                    String newStateIfNotComplete, boolean recovering)
0979:                    throws JobPersistenceException {
0980:                Calendar cal = null;
0981:                if (trig.getCalendarName() != null) {
0982:                    cal = retrieveCalendar(conn, ctxt, trig.getCalendarName());
0983:                }
0984:
0985:                signaler.notifyTriggerListenersMisfired(trig);
0986:
0987:                trig.updateAfterMisfire(cal);
0988:
0989:                if (trig.getNextFireTime() == null) {
0990:                    storeTrigger(conn, ctxt, trig, null, true, STATE_COMPLETE,
0991:                            forceState, recovering);
0992:                } else {
0993:                    storeTrigger(conn, ctxt, trig, null, true,
0994:                            newStateIfNotComplete, forceState, false);
0995:                }
0996:            }
0997:
0998:            /**
0999:             * <p>
1000:             * Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>.
1001:             * </p>
1002:             * 
1003:             * @param newJob
1004:             *          The <code>JobDetail</code> to be stored.
1005:             * @param newTrigger
1006:             *          The <code>Trigger</code> to be stored.
1007:             * @throws ObjectAlreadyExistsException
1008:             *           if a <code>Job</code> with the same name/group already
1009:             *           exists.
1010:             */
1011:            public void storeJobAndTrigger(final SchedulingContext ctxt,
1012:                    final JobDetail newJob, final Trigger newTrigger)
1013:                    throws ObjectAlreadyExistsException,
1014:                    JobPersistenceException {
1015:                executeInLock((isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
1016:                        new VoidTransactionCallback() {
1017:                            public void execute(Connection conn)
1018:                                    throws JobPersistenceException {
1019:                                if (newJob.isVolatile()
1020:                                        && !newTrigger.isVolatile()) {
1021:                                    JobPersistenceException jpe = new JobPersistenceException(
1022:                                            "Cannot associate non-volatile trigger with a volatile job!");
1023:                                    jpe
1024:                                            .setErrorCode(SchedulerException.ERR_CLIENT_ERROR);
1025:                                    throw jpe;
1026:                                }
1027:
1028:                                storeJob(conn, ctxt, newJob, false);
1029:                                storeTrigger(conn, ctxt, newTrigger, newJob,
1030:                                        false, Constants.STATE_WAITING, false,
1031:                                        false);
1032:                            }
1033:                        });
1034:            }
1035:
1036:            /**
1037:             * <p>
1038:             * Store the given <code>{@link org.quartz.JobDetail}</code>.
1039:             * </p>
1040:             * 
1041:             * @param newJob
1042:             *          The <code>JobDetail</code> to be stored.
1043:             * @param replaceExisting
1044:             *          If <code>true</code>, any <code>Job</code> existing in the
1045:             *          <code>JobStore</code> with the same name & group should be
1046:             *          over-written.
1047:             * @throws ObjectAlreadyExistsException
1048:             *           if a <code>Job</code> with the same name/group already
1049:             *           exists, and replaceExisting is set to false.
1050:             */
1051:            public void storeJob(final SchedulingContext ctxt,
1052:                    final JobDetail newJob, final boolean replaceExisting)
1053:                    throws ObjectAlreadyExistsException,
1054:                    JobPersistenceException {
1055:                executeInLock(
1056:                        (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS
1057:                                : null, new VoidTransactionCallback() {
1058:                            public void execute(Connection conn)
1059:                                    throws JobPersistenceException {
1060:                                storeJob(conn, ctxt, newJob, replaceExisting);
1061:                            }
1062:                        });
1063:            }
1064:
1065:            /**
1066:             * <p>
1067:             * Insert or update a job.
1068:             * </p>
1069:             */
1070:            protected void storeJob(Connection conn, SchedulingContext ctxt,
1071:                    JobDetail newJob, boolean replaceExisting)
1072:                    throws ObjectAlreadyExistsException,
1073:                    JobPersistenceException {
1074:                if (newJob.isVolatile() && isClustered()) {
1075:                    getLog()
1076:                            .info(
1077:                                    "note: volatile jobs are effectively non-volatile in a clustered environment.");
1078:                }
1079:
1080:                boolean existingJob = jobExists(conn, newJob.getName(), newJob
1081:                        .getGroup());
1082:                try {
1083:                    if (existingJob) {
1084:                        if (!replaceExisting) {
1085:                            throw new ObjectAlreadyExistsException(newJob);
1086:                        }
1087:                        getDelegate().updateJobDetail(conn, newJob);
1088:                    } else {
1089:                        getDelegate().insertJobDetail(conn, newJob);
1090:                    }
1091:                } catch (IOException e) {
1092:                    throw new JobPersistenceException("Couldn't store job: "
1093:                            + e.getMessage(), e);
1094:                } catch (SQLException e) {
1095:                    throw new JobPersistenceException("Couldn't store job: "
1096:                            + e.getMessage(), e);
1097:                }
1098:            }
1099:
1100:            /**
1101:             * <p>
1102:             * Check existence of a given job.
1103:             * </p>
1104:             */
1105:            protected boolean jobExists(Connection conn, String jobName,
1106:                    String groupName) throws JobPersistenceException {
1107:                try {
1108:                    return getDelegate().jobExists(conn, jobName, groupName);
1109:                } catch (SQLException e) {
1110:                    throw new JobPersistenceException(
1111:                            "Couldn't determine job existence (" + groupName
1112:                                    + "." + jobName + "): " + e.getMessage(), e);
1113:                }
1114:            }
1115:
1116:            /**
1117:             * <p>
1118:             * Store the given <code>{@link org.quartz.Trigger}</code>.
1119:             * </p>
1120:             * 
1121:             * @param newTrigger
1122:             *          The <code>Trigger</code> to be stored.
1123:             * @param replaceExisting
1124:             *          If <code>true</code>, any <code>Trigger</code> existing in
1125:             *          the <code>JobStore</code> with the same name & group should
1126:             *          be over-written.
1127:             * @throws ObjectAlreadyExistsException
1128:             *           if a <code>Trigger</code> with the same name/group already
1129:             *           exists, and replaceExisting is set to false.
1130:             */
1131:            public void storeTrigger(final SchedulingContext ctxt,
1132:                    final Trigger newTrigger, final boolean replaceExisting)
1133:                    throws ObjectAlreadyExistsException,
1134:                    JobPersistenceException {
1135:                executeInLock(
1136:                        (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS
1137:                                : null, new VoidTransactionCallback() {
1138:                            public void execute(Connection conn)
1139:                                    throws JobPersistenceException {
1140:                                storeTrigger(conn, ctxt, newTrigger, null,
1141:                                        replaceExisting, STATE_WAITING, false,
1142:                                        false);
1143:                            }
1144:                        });
1145:            }
1146:
1147:            /**
1148:             * <p>
1149:             * Insert or update a trigger.
1150:             * </p>
1151:             */
1152:            protected void storeTrigger(Connection conn,
1153:                    SchedulingContext ctxt, Trigger newTrigger, JobDetail job,
1154:                    boolean replaceExisting, String state, boolean forceState,
1155:                    boolean recovering) throws ObjectAlreadyExistsException,
1156:                    JobPersistenceException {
1157:                if (newTrigger.isVolatile() && isClustered()) {
1158:                    getLog()
1159:                            .info(
1160:                                    "note: volatile triggers are effectively non-volatile in a clustered environment.");
1161:                }
1162:
1163:                boolean existingTrigger = triggerExists(conn, newTrigger
1164:                        .getName(), newTrigger.getGroup());
1165:
1166:                if ((existingTrigger) && (!replaceExisting)) {
1167:                    throw new ObjectAlreadyExistsException(newTrigger);
1168:                }
1169:
1170:                try {
1171:
1172:                    boolean shouldBepaused = false;
1173:
1174:                    if (!forceState) {
1175:                        shouldBepaused = getDelegate().isTriggerGroupPaused(
1176:                                conn, newTrigger.getGroup());
1177:
1178:                        if (!shouldBepaused) {
1179:                            shouldBepaused = getDelegate()
1180:                                    .isTriggerGroupPaused(conn,
1181:                                            ALL_GROUPS_PAUSED);
1182:
1183:                            if (shouldBepaused) {
1184:                                getDelegate().insertPausedTriggerGroup(conn,
1185:                                        newTrigger.getGroup());
1186:                            }
1187:                        }
1188:
1189:                        if (shouldBepaused
1190:                                && (state.equals(STATE_WAITING) || state
1191:                                        .equals(STATE_ACQUIRED))) {
1192:                            state = STATE_PAUSED;
1193:                        }
1194:                    }
1195:
1196:                    if (job == null) {
1197:                        job = getDelegate().selectJobDetail(conn,
1198:                                newTrigger.getJobName(),
1199:                                newTrigger.getJobGroup(), getClassLoadHelper());
1200:                    }
1201:                    if (job == null) {
1202:                        throw new JobPersistenceException("The job ("
1203:                                + newTrigger.getFullJobName()
1204:                                + ") referenced by the trigger does not exist.");
1205:                    }
1206:                    if (job.isVolatile() && !newTrigger.isVolatile()) {
1207:                        throw new JobPersistenceException(
1208:                                "It does not make sense to "
1209:                                        + "associate a non-volatile Trigger with a volatile Job!");
1210:                    }
1211:
1212:                    if (job.isStateful() && !recovering) {
1213:                        state = checkBlockedState(conn, ctxt, job.getName(),
1214:                                job.getGroup(), state);
1215:                    }
1216:
1217:                    if (existingTrigger) {
1218:                        if (newTrigger.getClass() == SimpleTrigger.class) {
1219:                            getDelegate().updateSimpleTrigger(conn,
1220:                                    (SimpleTrigger) newTrigger);
1221:                        } else if (newTrigger.getClass() == CronTrigger.class) {
1222:                            getDelegate().updateCronTrigger(conn,
1223:                                    (CronTrigger) newTrigger);
1224:                        } else {
1225:                            getDelegate().updateBlobTrigger(conn, newTrigger);
1226:                        }
1227:                        getDelegate().updateTrigger(conn, newTrigger, state,
1228:                                job);
1229:                    } else {
1230:                        getDelegate().insertTrigger(conn, newTrigger, state,
1231:                                job);
1232:                        if (newTrigger.getClass() == SimpleTrigger.class) {
1233:                            getDelegate().insertSimpleTrigger(conn,
1234:                                    (SimpleTrigger) newTrigger);
1235:                        } else if (newTrigger.getClass() == CronTrigger.class) {
1236:                            getDelegate().insertCronTrigger(conn,
1237:                                    (CronTrigger) newTrigger);
1238:                        } else {
1239:                            getDelegate().insertBlobTrigger(conn, newTrigger);
1240:                        }
1241:                    }
1242:                } catch (Exception e) {
1243:                    throw new JobPersistenceException(
1244:                            "Couldn't store trigger: " + e.getMessage(), e);
1245:                }
1246:            }
1247:
1248:            /**
1249:             * <p>
1250:             * Check existence of a given trigger.
1251:             * </p>
1252:             */
1253:            protected boolean triggerExists(Connection conn,
1254:                    String triggerName, String groupName)
1255:                    throws JobPersistenceException {
1256:                try {
1257:                    return getDelegate().triggerExists(conn, triggerName,
1258:                            groupName);
1259:                } catch (SQLException e) {
1260:                    throw new JobPersistenceException(
1261:                            "Couldn't determine trigger existence ("
1262:                                    + groupName + "." + triggerName + "): "
1263:                                    + e.getMessage(), e);
1264:                }
1265:            }
1266:
1267:            /**
1268:             * <p>
1269:             * Remove (delete) the <code>{@link org.quartz.Job}</code> with the given
1270:             * name, and any <code>{@link org.quartz.Trigger}</code> s that reference
1271:             * it.
1272:             * </p>
1273:             * 
1274:             * <p>
1275:             * If removal of the <code>Job</code> results in an empty group, the
1276:             * group should be removed from the <code>JobStore</code>'s list of
1277:             * known group names.
1278:             * </p>
1279:             * 
1280:             * @param jobName
1281:             *          The name of the <code>Job</code> to be removed.
1282:             * @param groupName
1283:             *          The group name of the <code>Job</code> to be removed.
1284:             * @return <code>true</code> if a <code>Job</code> with the given name &
1285:             *         group was found and removed from the store.
1286:             */
1287:            public boolean removeJob(final SchedulingContext ctxt,
1288:                    final String jobName, final String groupName)
1289:                    throws JobPersistenceException {
1290:                return ((Boolean) executeInLock(LOCK_TRIGGER_ACCESS,
1291:                        new TransactionCallback() {
1292:                            public Object execute(Connection conn)
1293:                                    throws JobPersistenceException {
1294:                                return removeJob(conn, ctxt, jobName,
1295:                                        groupName, true) ? Boolean.TRUE
1296:                                        : Boolean.FALSE;
1297:                            }
1298:                        })).booleanValue();
1299:            }
1300:
1301:            protected boolean removeJob(Connection conn,
1302:                    SchedulingContext ctxt, String jobName, String groupName,
1303:                    boolean activeDeleteSafe) throws JobPersistenceException {
1304:
1305:                try {
1306:                    Key[] jobTriggers = getDelegate().selectTriggerNamesForJob(
1307:                            conn, jobName, groupName);
1308:                    for (int i = 0; i < jobTriggers.length; ++i) {
1309:                        deleteTriggerAndChildren(conn,
1310:                                jobTriggers[i].getName(), jobTriggers[i]
1311:                                        .getGroup());
1312:                    }
1313:
1314:                    return deleteJobAndChildren(conn, ctxt, jobName, groupName);
1315:                } catch (SQLException e) {
1316:                    throw new JobPersistenceException("Couldn't remove job: "
1317:                            + e.getMessage(), e);
1318:                }
1319:            }
1320:
1321:            /**
1322:             * Delete a job and its listeners.
1323:             * 
1324:             * @see #removeJob(Connection, SchedulingContext, String, String, boolean)
1325:             * @see #removeTrigger(Connection, SchedulingContext, String, String)
1326:             */
1327:            private boolean deleteJobAndChildren(Connection conn,
1328:                    SchedulingContext ctxt, String jobName, String groupName)
1329:                    throws NoSuchDelegateException, SQLException {
1330:                getDelegate().deleteJobListeners(conn, jobName, groupName);
1331:
1332:                return (getDelegate().deleteJobDetail(conn, jobName, groupName) > 0);
1333:            }
1334:
1335:            /**
1336:             * Delete a trigger, its listeners, and its Simple/Cron/BLOB sub-table entry.
1337:             * 
1338:             * @see #removeJob(Connection, SchedulingContext, String, String, boolean)
1339:             * @see #removeTrigger(Connection, SchedulingContext, String, String)
1340:             * @see #replaceTrigger(Connection, SchedulingContext, String, String, Trigger)
1341:             */
1342:            private boolean deleteTriggerAndChildren(Connection conn,
1343:                    String triggerName, String triggerGroupName)
1344:                    throws SQLException, NoSuchDelegateException {
1345:                DriverDelegate delegate = getDelegate();
1346:
1347:                // Once it succeeds in deleting one sub-table entry it will not try the others.
1348:                if ((delegate.deleteSimpleTrigger(conn, triggerName,
1349:                        triggerGroupName) == 0)
1350:                        && (delegate.deleteCronTrigger(conn, triggerName,
1351:                                triggerGroupName) == 0)) {
1352:                    delegate.deleteBlobTrigger(conn, triggerName,
1353:                            triggerGroupName);
1354:                }
1355:
1356:                delegate.deleteTriggerListeners(conn, triggerName,
1357:                        triggerGroupName);
1358:
1359:                return (delegate.deleteTrigger(conn, triggerName,
1360:                        triggerGroupName) > 0);
1361:            }
1362:
1363:            /**
1364:             * <p>
1365:             * Retrieve the <code>{@link org.quartz.JobDetail}</code> for the given
1366:             * <code>{@link org.quartz.Job}</code>.
1367:             * </p>
1368:             * 
1369:             * @param jobName
1370:             *          The name of the <code>Job</code> to be retrieved.
1371:             * @param groupName
1372:             *          The group name of the <code>Job</code> to be retrieved.
1373:             * @return The desired <code>Job</code>, or null if there is no match.
1374:             */
1375:            public JobDetail retrieveJob(final SchedulingContext ctxt,
1376:                    final String jobName, final String groupName)
1377:                    throws JobPersistenceException {
1378:                return (JobDetail) executeWithoutLock( // no locks necessary for read...
1379:                new TransactionCallback() {
1380:                    public Object execute(Connection conn)
1381:                            throws JobPersistenceException {
1382:                        return retrieveJob(conn, ctxt, jobName, groupName);
1383:                    }
1384:                });
1385:            }
1386:
1387:            protected JobDetail retrieveJob(Connection conn,
1388:                    SchedulingContext ctxt, String jobName, String groupName)
1389:                    throws JobPersistenceException {
1390:                try {
1391:                    JobDetail job = getDelegate().selectJobDetail(conn,
1392:                            jobName, groupName, getClassLoadHelper());
1393:                    if (job != null) {
1394:                        String[] listeners = getDelegate().selectJobListeners(
1395:                                conn, jobName, groupName);
1396:                        for (int i = 0; i < listeners.length; ++i) {
1397:                            job.addJobListener(listeners[i]);
1398:                        }
1399:                    }
1400:
1401:                    return job;
1402:                } catch (ClassNotFoundException e) {
1403:                    throw new JobPersistenceException(
1404:                            "Couldn't retrieve job because a required class was not found: "
1405:                                    + e.getMessage(),
1406:                            e,
1407:                            SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST);
1408:                } catch (IOException e) {
1409:                    throw new JobPersistenceException(
1410:                            "Couldn't retrieve job because the BLOB couldn't be deserialized: "
1411:                                    + e.getMessage(),
1412:                            e,
1413:                            SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST);
1414:                } catch (SQLException e) {
1415:                    throw new JobPersistenceException("Couldn't retrieve job: "
1416:                            + e.getMessage(), e);
1417:                }
1418:            }
1419:
1420:            /**
1421:             * <p>
1422:             * Remove (delete) the <code>{@link org.quartz.Trigger}</code> with the
1423:             * given name.
1424:             * </p>
1425:             * 
1426:             * <p>
1427:             * If removal of the <code>Trigger</code> results in an empty group, the
1428:             * group should be removed from the <code>JobStore</code>'s list of
1429:             * known group names.
1430:             * </p>
1431:             * 
1432:             * <p>
1433:             * If removal of the <code>Trigger</code> results in an 'orphaned' <code>Job</code>
1434:             * that is not 'durable', then the <code>Job</code> should be deleted
1435:             * also.
1436:             * </p>
1437:             * 
1438:             * @param triggerName
1439:             *          The name of the <code>Trigger</code> to be removed.
1440:             * @param groupName
1441:             *          The group name of the <code>Trigger</code> to be removed.
1442:             * @return <code>true</code> if a <code>Trigger</code> with the given
1443:             *         name & group was found and removed from the store.
1444:             */
1445:            public boolean removeTrigger(final SchedulingContext ctxt,
1446:                    final String triggerName, final String groupName)
1447:                    throws JobPersistenceException {
1448:                return ((Boolean) executeInLock(LOCK_TRIGGER_ACCESS,
1449:                        new TransactionCallback() {
1450:                            public Object execute(Connection conn)
1451:                                    throws JobPersistenceException {
1452:                                return removeTrigger(conn, ctxt, triggerName,
1453:                                        groupName) ? Boolean.TRUE
1454:                                        : Boolean.FALSE;
1455:                            }
1456:                        })).booleanValue();
1457:            }
1458:
1459:            protected boolean removeTrigger(Connection conn,
1460:                    SchedulingContext ctxt, String triggerName, String groupName)
1461:                    throws JobPersistenceException {
1462:                boolean removedTrigger = false;
1463:                try {
1464:                    // this must be called before we delete the trigger, obviously
1465:                    JobDetail job = getDelegate().selectJobForTrigger(conn,
1466:                            triggerName, groupName, getClassLoadHelper());
1467:
1468:                    removedTrigger = deleteTriggerAndChildren(conn,
1469:                            triggerName, groupName);
1470:
1471:                    if (null != job && !job.isDurable()) {
1472:                        int numTriggers = getDelegate()
1473:                                .selectNumTriggersForJob(conn, job.getName(),
1474:                                        job.getGroup());
1475:                        if (numTriggers == 0) {
1476:                            // Don't call removeJob() because we don't want to check for
1477:                            // triggers again.
1478:                            deleteJobAndChildren(conn, ctxt, job.getName(), job
1479:                                    .getGroup());
1480:                        }
1481:                    }
1482:                } catch (ClassNotFoundException e) {
1483:                    throw new JobPersistenceException(
1484:                            "Couldn't remove trigger: " + e.getMessage(), e);
1485:                } catch (SQLException e) {
1486:                    throw new JobPersistenceException(
1487:                            "Couldn't remove trigger: " + e.getMessage(), e);
1488:                }
1489:
1490:                return removedTrigger;
1491:            }
1492:
1493:            /** 
1494:             * @see org.quartz.spi.JobStore#replaceTrigger(org.quartz.core.SchedulingContext, java.lang.String, java.lang.String, org.quartz.Trigger)
1495:             */
1496:            public boolean replaceTrigger(final SchedulingContext ctxt,
1497:                    final String triggerName, final String groupName,
1498:                    final Trigger newTrigger) throws JobPersistenceException {
1499:                return ((Boolean) executeInLock(LOCK_TRIGGER_ACCESS,
1500:                        new TransactionCallback() {
1501:                            public Object execute(Connection conn)
1502:                                    throws JobPersistenceException {
1503:                                return replaceTrigger(conn, ctxt, triggerName,
1504:                                        groupName, newTrigger) ? Boolean.TRUE
1505:                                        : Boolean.FALSE;
1506:                            }
1507:                        })).booleanValue();
1508:            }
1509:
1510:            protected boolean replaceTrigger(Connection conn,
1511:                    SchedulingContext ctxt, String triggerName,
1512:                    String groupName, Trigger newTrigger)
1513:                    throws JobPersistenceException {
1514:                try {
1515:                    // this must be called before we delete the trigger, obviously
1516:                    JobDetail job = getDelegate().selectJobForTrigger(conn,
1517:                            triggerName, groupName, getClassLoadHelper());
1518:
1519:                    if (job == null) {
1520:                        return false;
1521:                    }
1522:
1523:                    if (!newTrigger.getJobName().equals(job.getName())
1524:                            || !newTrigger.getJobGroup().equals(job.getGroup())) {
1525:                        throw new JobPersistenceException(
1526:                                "New trigger is not related to the same job as the old trigger.");
1527:                    }
1528:
1529:                    boolean removedTrigger = deleteTriggerAndChildren(conn,
1530:                            triggerName, groupName);
1531:
1532:                    storeTrigger(conn, ctxt, newTrigger, job, false,
1533:                            STATE_WAITING, false, false);
1534:
1535:                    return removedTrigger;
1536:                } catch (ClassNotFoundException e) {
1537:                    throw new JobPersistenceException(
1538:                            "Couldn't remove trigger: " + e.getMessage(), e);
1539:                } catch (SQLException e) {
1540:                    throw new JobPersistenceException(
1541:                            "Couldn't remove trigger: " + e.getMessage(), e);
1542:                }
1543:            }
1544:
1545:            /**
1546:             * <p>
1547:             * Retrieve the given <code>{@link org.quartz.Trigger}</code>.
1548:             * </p>
1549:             * 
1550:             * @param triggerName
1551:             *          The name of the <code>Trigger</code> to be retrieved.
1552:             * @param groupName
1553:             *          The group name of the <code>Trigger</code> to be retrieved.
1554:             * @return The desired <code>Trigger</code>, or null if there is no
1555:             *         match.
1556:             */
1557:            public Trigger retrieveTrigger(final SchedulingContext ctxt,
1558:                    final String triggerName, final String groupName)
1559:                    throws JobPersistenceException {
1560:                return (Trigger) executeWithoutLock( // no locks necessary for read...
1561:                new TransactionCallback() {
1562:                    public Object execute(Connection conn)
1563:                            throws JobPersistenceException {
1564:                        return retrieveTrigger(conn, ctxt, triggerName,
1565:                                groupName);
1566:                    }
1567:                });
1568:            }
1569:
1570:            protected Trigger retrieveTrigger(Connection conn,
1571:                    SchedulingContext ctxt, String triggerName, String groupName)
1572:                    throws JobPersistenceException {
1573:                return retrieveTrigger(conn, triggerName, groupName);
1574:            }
1575:
1576:            protected Trigger retrieveTrigger(Connection conn,
1577:                    String triggerName, String groupName)
1578:                    throws JobPersistenceException {
1579:                try {
1580:                    Trigger trigger = getDelegate().selectTrigger(conn,
1581:                            triggerName, groupName);
1582:                    if (trigger == null) {
1583:                        return null;
1584:                    }
1585:
1586:                    // In case Trigger was BLOB, clear out any listeners that might 
1587:                    // have been serialized.
1588:                    trigger.clearAllTriggerListeners();
1589:
1590:                    String[] listeners = getDelegate().selectTriggerListeners(
1591:                            conn, triggerName, groupName);
1592:                    for (int i = 0; i < listeners.length; ++i) {
1593:                        trigger.addTriggerListener(listeners[i]);
1594:                    }
1595:
1596:                    return trigger;
1597:                } catch (Exception e) {
1598:                    throw new JobPersistenceException(
1599:                            "Couldn't retrieve trigger: " + e.getMessage(), e);
1600:                }
1601:            }
1602:
1603:            /**
1604:             * <p>
1605:             * Get the current state of the identified <code>{@link Trigger}</code>.
1606:             * </p>
1607:             * 
1608:             * @see Trigger#STATE_NORMAL
1609:             * @see Trigger#STATE_PAUSED
1610:             * @see Trigger#STATE_COMPLETE
1611:             * @see Trigger#STATE_ERROR
1612:             * @see Trigger#STATE_NONE
1613:             */
1614:            public int getTriggerState(final SchedulingContext ctxt,
1615:                    final String triggerName, final String groupName)
1616:                    throws JobPersistenceException {
1617:                return ((Integer) executeWithoutLock( // no locks necessary for read...
1618:                new TransactionCallback() {
1619:                    public Object execute(Connection conn)
1620:                            throws JobPersistenceException {
1621:                        return new Integer(getTriggerState(conn, ctxt,
1622:                                triggerName, groupName));
1623:                    }
1624:                })).intValue();
1625:            }
1626:
1627:            public int getTriggerState(Connection conn, SchedulingContext ctxt,
1628:                    String triggerName, String groupName)
1629:                    throws JobPersistenceException {
1630:                try {
1631:                    String ts = getDelegate().selectTriggerState(conn,
1632:                            triggerName, groupName);
1633:
1634:                    if (ts == null) {
1635:                        return Trigger.STATE_NONE;
1636:                    }
1637:
1638:                    if (ts.equals(STATE_DELETED)) {
1639:                        return Trigger.STATE_NONE;
1640:                    }
1641:
1642:                    if (ts.equals(STATE_COMPLETE)) {
1643:                        return Trigger.STATE_COMPLETE;
1644:                    }
1645:
1646:                    if (ts.equals(STATE_PAUSED)) {
1647:                        return Trigger.STATE_PAUSED;
1648:                    }
1649:
1650:                    if (ts.equals(STATE_PAUSED_BLOCKED)) {
1651:                        return Trigger.STATE_PAUSED;
1652:                    }
1653:
1654:                    if (ts.equals(STATE_ERROR)) {
1655:                        return Trigger.STATE_ERROR;
1656:                    }
1657:
1658:                    if (ts.equals(STATE_BLOCKED)) {
1659:                        return Trigger.STATE_BLOCKED;
1660:                    }
1661:
1662:                    return Trigger.STATE_NORMAL;
1663:
1664:                } catch (SQLException e) {
1665:                    throw new JobPersistenceException(
1666:                            "Couldn't determine state of trigger (" + groupName
1667:                                    + "." + triggerName + "): "
1668:                                    + e.getMessage(), e);
1669:                }
1670:            }
1671:
1672:            /**
1673:             * <p>
1674:             * Store the given <code>{@link org.quartz.Calendar}</code>.
1675:             * </p>
1676:             * 
1677:             * @param calName
1678:             *          The name of the calendar.
1679:             * @param calendar
1680:             *          The <code>Calendar</code> to be stored.
1681:             * @param replaceExisting
1682:             *          If <code>true</code>, any <code>Calendar</code> existing
1683:             *          in the <code>JobStore</code> with the same name & group
1684:             *          should be over-written.
1685:             * @throws ObjectAlreadyExistsException
1686:             *           if a <code>Calendar</code> with the same name already
1687:             *           exists, and replaceExisting is set to false.
1688:             */
1689:            public void storeCalendar(final SchedulingContext ctxt,
1690:                    final String calName, final Calendar calendar,
1691:                    final boolean replaceExisting, final boolean updateTriggers)
1692:                    throws ObjectAlreadyExistsException,
1693:                    JobPersistenceException {
1694:                executeInLock(
1695:                        (isLockOnInsert() || updateTriggers) ? LOCK_TRIGGER_ACCESS
1696:                                : null, new VoidTransactionCallback() {
1697:                            public void execute(Connection conn)
1698:                                    throws JobPersistenceException {
1699:                                storeCalendar(conn, ctxt, calName, calendar,
1700:                                        replaceExisting, updateTriggers);
1701:                            }
1702:                        });
1703:            }
1704:
1705:            protected void storeCalendar(Connection conn,
1706:                    SchedulingContext ctxt, String calName, Calendar calendar,
1707:                    boolean replaceExisting, boolean updateTriggers)
1708:                    throws ObjectAlreadyExistsException,
1709:                    JobPersistenceException {
1710:                try {
1711:                    boolean existingCal = calendarExists(conn, calName);
1712:                    if (existingCal && !replaceExisting) {
1713:                        throw new ObjectAlreadyExistsException(
1714:                                "Calendar with name '" + calName
1715:                                        + "' already exists.");
1716:                    }
1717:
1718:                    if (existingCal) {
1719:                        if (getDelegate().updateCalendar(conn, calName,
1720:                                calendar) < 1) {
1721:                            throw new JobPersistenceException(
1722:                                    "Couldn't store calendar.  Update failed.");
1723:                        }
1724:
1725:                        if (updateTriggers) {
1726:                            Trigger[] trigs = getDelegate()
1727:                                    .selectTriggersForCalendar(conn, calName);
1728:
1729:                            for (int i = 0; i < trigs.length; i++) {
1730:                                trigs[i].updateWithNewCalendar(calendar,
1731:                                        getMisfireThreshold());
1732:                                storeTrigger(conn, ctxt, trigs[i], null, true,
1733:                                        STATE_WAITING, false, false);
1734:                            }
1735:                        }
1736:                    } else {
1737:                        if (getDelegate().insertCalendar(conn, calName,
1738:                                calendar) < 1) {
1739:                            throw new JobPersistenceException(
1740:                                    "Couldn't store calendar.  Insert failed.");
1741:                        }
1742:                    }
1743:
1744:                    if (isClustered == false) {
1745:                        calendarCache.put(calName, calendar); // lazy-cache
1746:                    }
1747:
1748:                } catch (IOException e) {
1749:                    throw new JobPersistenceException(
1750:                            "Couldn't store calendar because the BLOB couldn't be serialized: "
1751:                                    + e.getMessage(), e);
1752:                } catch (ClassNotFoundException e) {
1753:                    throw new JobPersistenceException(
1754:                            "Couldn't store calendar: " + e.getMessage(), e);
1755:                } catch (SQLException e) {
1756:                    throw new JobPersistenceException(
1757:                            "Couldn't store calendar: " + e.getMessage(), e);
1758:                }
1759:            }
1760:
1761:            protected boolean calendarExists(Connection conn, String calName)
1762:                    throws JobPersistenceException {
1763:                try {
1764:                    return getDelegate().calendarExists(conn, calName);
1765:                } catch (SQLException e) {
1766:                    throw new JobPersistenceException(
1767:                            "Couldn't determine calendar existence (" + calName
1768:                                    + "): " + e.getMessage(), e);
1769:                }
1770:            }
1771:
1772:            /**
1773:             * <p>
1774:             * Remove (delete) the <code>{@link org.quartz.Calendar}</code> with the
1775:             * given name.
1776:             * </p>
1777:             * 
1778:             * <p>
1779:             * If removal of the <code>Calendar</code> would result in
1780:             * <code.Trigger</code>s pointing to non-existent calendars, then a
1781:             * <code>JobPersistenceException</code> will be thrown.</p>
1782:             *       *
1783:             * @param calName The name of the <code>Calendar</code> to be removed.
1784:             * @return <code>true</code> if a <code>Calendar</code> with the given name
1785:             * was found and removed from the store.
1786:             */
1787:            public boolean removeCalendar(final SchedulingContext ctxt,
1788:                    final String calName) throws JobPersistenceException {
1789:                return ((Boolean) executeInLock(LOCK_TRIGGER_ACCESS,
1790:                        new TransactionCallback() {
1791:                            public Object execute(Connection conn)
1792:                                    throws JobPersistenceException {
1793:                                return removeCalendar(conn, ctxt, calName) ? Boolean.TRUE
1794:                                        : Boolean.FALSE;
1795:                            }
1796:                        })).booleanValue();
1797:            }
1798:
1799:            protected boolean removeCalendar(Connection conn,
1800:                    SchedulingContext ctxt, String calName)
1801:                    throws JobPersistenceException {
1802:                try {
1803:                    if (getDelegate().calendarIsReferenced(conn, calName)) {
1804:                        throw new JobPersistenceException(
1805:                                "Calender cannot be removed if it referenced by a trigger!");
1806:                    }
1807:
1808:                    if (isClustered == false) {
1809:                        calendarCache.remove(calName);
1810:                    }
1811:
1812:                    return (getDelegate().deleteCalendar(conn, calName) > 0);
1813:                } catch (SQLException e) {
1814:                    throw new JobPersistenceException(
1815:                            "Couldn't remove calendar: " + e.getMessage(), e);
1816:                }
1817:            }
1818:
1819:            /**
1820:             * <p>
1821:             * Retrieve the given <code>{@link org.quartz.Trigger}</code>.
1822:             * </p>
1823:             * 
1824:             * @param calName
1825:             *          The name of the <code>Calendar</code> to be retrieved.
1826:             * @return The desired <code>Calendar</code>, or null if there is no
1827:             *         match.
1828:             */
1829:            public Calendar retrieveCalendar(final SchedulingContext ctxt,
1830:                    final String calName) throws JobPersistenceException {
1831:                return (Calendar) executeWithoutLock( // no locks necessary for read...
1832:                new TransactionCallback() {
1833:                    public Object execute(Connection conn)
1834:                            throws JobPersistenceException {
1835:                        return retrieveCalendar(conn, ctxt, calName);
1836:                    }
1837:                });
1838:            }
1839:
1840:            protected Calendar retrieveCalendar(Connection conn,
1841:                    SchedulingContext ctxt, String calName)
1842:                    throws JobPersistenceException {
1843:                // all calendars are persistent, but we can lazy-cache them during run
1844:                // time as long as we aren't running clustered.
1845:                Calendar cal = (isClustered) ? null : (Calendar) calendarCache
1846:                        .get(calName);
1847:                if (cal != null) {
1848:                    return cal;
1849:                }
1850:
1851:                try {
1852:                    cal = getDelegate().selectCalendar(conn, calName);
1853:                    if (isClustered == false) {
1854:                        calendarCache.put(calName, cal); // lazy-cache...
1855:                    }
1856:                    return cal;
1857:                } catch (ClassNotFoundException e) {
1858:                    throw new JobPersistenceException(
1859:                            "Couldn't retrieve calendar because a required class was not found: "
1860:                                    + e.getMessage(), e);
1861:                } catch (IOException e) {
1862:                    throw new JobPersistenceException(
1863:                            "Couldn't retrieve calendar because the BLOB couldn't be deserialized: "
1864:                                    + e.getMessage(), e);
1865:                } catch (SQLException e) {
1866:                    throw new JobPersistenceException(
1867:                            "Couldn't retrieve calendar: " + e.getMessage(), e);
1868:                }
1869:            }
1870:
1871:            /**
1872:             * <p>
1873:             * Get the number of <code>{@link org.quartz.Job}</code> s that are
1874:             * stored in the <code>JobStore</code>.
1875:             * </p>
1876:             */
1877:            public int getNumberOfJobs(final SchedulingContext ctxt)
1878:                    throws JobPersistenceException {
1879:                return ((Integer) executeWithoutLock( // no locks necessary for read...
1880:                new TransactionCallback() {
1881:                    public Object execute(Connection conn)
1882:                            throws JobPersistenceException {
1883:                        return new Integer(getNumberOfJobs(conn, ctxt));
1884:                    }
1885:                })).intValue();
1886:            }
1887:
1888:            protected int getNumberOfJobs(Connection conn,
1889:                    SchedulingContext ctxt) throws JobPersistenceException {
1890:                try {
1891:                    return getDelegate().selectNumJobs(conn);
1892:                } catch (SQLException e) {
1893:                    throw new JobPersistenceException(
1894:                            "Couldn't obtain number of jobs: " + e.getMessage(),
1895:                            e);
1896:                }
1897:            }
1898:
1899:            /**
1900:             * <p>
1901:             * Get the number of <code>{@link org.quartz.Trigger}</code> s that are
1902:             * stored in the <code>JobsStore</code>.
1903:             * </p>
1904:             */
1905:            public int getNumberOfTriggers(final SchedulingContext ctxt)
1906:                    throws JobPersistenceException {
1907:                return ((Integer) executeWithoutLock( // no locks necessary for read...
1908:                new TransactionCallback() {
1909:                    public Object execute(Connection conn)
1910:                            throws JobPersistenceException {
1911:                        return new Integer(getNumberOfTriggers(conn, ctxt));
1912:                    }
1913:                })).intValue();
1914:            }
1915:
1916:            protected int getNumberOfTriggers(Connection conn,
1917:                    SchedulingContext ctxt) throws JobPersistenceException {
1918:                try {
1919:                    return getDelegate().selectNumTriggers(conn);
1920:                } catch (SQLException e) {
1921:                    throw new JobPersistenceException(
1922:                            "Couldn't obtain number of triggers: "
1923:                                    + e.getMessage(), e);
1924:                }
1925:            }
1926:
1927:            /**
1928:             * <p>
1929:             * Get the number of <code>{@link org.quartz.Calendar}</code> s that are
1930:             * stored in the <code>JobsStore</code>.
1931:             * </p>
1932:             */
1933:            public int getNumberOfCalendars(final SchedulingContext ctxt)
1934:                    throws JobPersistenceException {
1935:                return ((Integer) executeWithoutLock( // no locks necessary for read...
1936:                new TransactionCallback() {
1937:                    public Object execute(Connection conn)
1938:                            throws JobPersistenceException {
1939:                        return new Integer(getNumberOfCalendars(conn, ctxt));
1940:                    }
1941:                })).intValue();
1942:            }
1943:
1944:            protected int getNumberOfCalendars(Connection conn,
1945:                    SchedulingContext ctxt) throws JobPersistenceException {
1946:                try {
1947:                    return getDelegate().selectNumCalendars(conn);
1948:                } catch (SQLException e) {
1949:                    throw new JobPersistenceException(
1950:                            "Couldn't obtain number of calendars: "
1951:                                    + e.getMessage(), e);
1952:                }
1953:            }
1954:
1955:            /**
1956:             * <p>
1957:             * Get the names of all of the <code>{@link org.quartz.Job}</code> s that
1958:             * have the given group name.
1959:             * </p>
1960:             * 
1961:             * <p>
1962:             * If there are no jobs in the given group name, the result should be a
1963:             * zero-length array (not <code>null</code>).
1964:             * </p>
1965:             */
1966:            public String[] getJobNames(final SchedulingContext ctxt,
1967:                    final String groupName) throws JobPersistenceException {
1968:                return (String[]) executeWithoutLock( // no locks necessary for read...
1969:                new TransactionCallback() {
1970:                    public Object execute(Connection conn)
1971:                            throws JobPersistenceException {
1972:                        return getJobNames(conn, ctxt, groupName);
1973:                    }
1974:                });
1975:            }
1976:
1977:            protected String[] getJobNames(Connection conn,
1978:                    SchedulingContext ctxt, String groupName)
1979:                    throws JobPersistenceException {
1980:                String[] jobNames = null;
1981:
1982:                try {
1983:                    jobNames = getDelegate().selectJobsInGroup(conn, groupName);
1984:                } catch (SQLException e) {
1985:                    throw new JobPersistenceException(
1986:                            "Couldn't obtain job names: " + e.getMessage(), e);
1987:                }
1988:
1989:                return jobNames;
1990:            }
1991:
1992:            /**
1993:             * <p>
1994:             * Get the names of all of the <code>{@link org.quartz.Trigger}</code> s
1995:             * that have the given group name.
1996:             * </p>
1997:             * 
1998:             * <p>
1999:             * If there are no triggers in the given group name, the result should be a
2000:             * zero-length array (not <code>null</code>).
2001:             * </p>
2002:             */
2003:            public String[] getTriggerNames(final SchedulingContext ctxt,
2004:                    final String groupName) throws JobPersistenceException {
2005:                return (String[]) executeWithoutLock( // no locks necessary for read...
2006:                new TransactionCallback() {
2007:                    public Object execute(Connection conn)
2008:                            throws JobPersistenceException {
2009:                        return getTriggerNames(conn, ctxt, groupName);
2010:                    }
2011:                });
2012:            }
2013:
2014:            protected String[] getTriggerNames(Connection conn,
2015:                    SchedulingContext ctxt, String groupName)
2016:                    throws JobPersistenceException {
2017:
2018:                String[] trigNames = null;
2019:
2020:                try {
2021:                    trigNames = getDelegate().selectTriggersInGroup(conn,
2022:                            groupName);
2023:                } catch (SQLException e) {
2024:                    throw new JobPersistenceException(
2025:                            "Couldn't obtain trigger names: " + e.getMessage(),
2026:                            e);
2027:                }
2028:
2029:                return trigNames;
2030:            }
2031:
2032:            /**
2033:             * <p>
2034:             * Get the names of all of the <code>{@link org.quartz.Job}</code>
2035:             * groups.
2036:             * </p>
2037:             * 
2038:             * <p>
2039:             * If there are no known group names, the result should be a zero-length
2040:             * array (not <code>null</code>).
2041:             * </p>
2042:             */
2043:            public String[] getJobGroupNames(final SchedulingContext ctxt)
2044:                    throws JobPersistenceException {
2045:                return (String[]) executeWithoutLock( // no locks necessary for read...
2046:                new TransactionCallback() {
2047:                    public Object execute(Connection conn)
2048:                            throws JobPersistenceException {
2049:                        return getJobGroupNames(conn, ctxt);
2050:                    }
2051:                });
2052:            }
2053:
2054:            protected String[] getJobGroupNames(Connection conn,
2055:                    SchedulingContext ctxt) throws JobPersistenceException {
2056:
2057:                String[] groupNames = null;
2058:
2059:                try {
2060:                    groupNames = getDelegate().selectJobGroups(conn);
2061:                } catch (SQLException e) {
2062:                    throw new JobPersistenceException(
2063:                            "Couldn't obtain job groups: " + e.getMessage(), e);
2064:                }
2065:
2066:                return groupNames;
2067:            }
2068:
2069:            /**
2070:             * <p>
2071:             * Get the names of all of the <code>{@link org.quartz.Trigger}</code>
2072:             * groups.
2073:             * </p>
2074:             * 
2075:             * <p>
2076:             * If there are no known group names, the result should be a zero-length
2077:             * array (not <code>null</code>).
2078:             * </p>
2079:             */
2080:            public String[] getTriggerGroupNames(final SchedulingContext ctxt)
2081:                    throws JobPersistenceException {
2082:                return (String[]) executeWithoutLock( // no locks necessary for read...
2083:                new TransactionCallback() {
2084:                    public Object execute(Connection conn)
2085:                            throws JobPersistenceException {
2086:                        return getTriggerGroupNames(conn, ctxt);
2087:                    }
2088:                });
2089:            }
2090:
2091:            protected String[] getTriggerGroupNames(Connection conn,
2092:                    SchedulingContext ctxt) throws JobPersistenceException {
2093:
2094:                String[] groupNames = null;
2095:
2096:                try {
2097:                    groupNames = getDelegate().selectTriggerGroups(conn);
2098:                } catch (SQLException e) {
2099:                    throw new JobPersistenceException(
2100:                            "Couldn't obtain trigger groups: " + e.getMessage(),
2101:                            e);
2102:                }
2103:
2104:                return groupNames;
2105:            }
2106:
2107:            /**
2108:             * <p>
2109:             * Get the names of all of the <code>{@link org.quartz.Calendar}</code> s
2110:             * in the <code>JobStore</code>.
2111:             * </p>
2112:             * 
2113:             * <p>
2114:             * If there are no Calendars in the given group name, the result should be
2115:             * a zero-length array (not <code>null</code>).
2116:             * </p>
2117:             */
2118:            public String[] getCalendarNames(final SchedulingContext ctxt)
2119:                    throws JobPersistenceException {
2120:                return (String[]) executeWithoutLock( // no locks necessary for read...
2121:                new TransactionCallback() {
2122:                    public Object execute(Connection conn)
2123:                            throws JobPersistenceException {
2124:                        return getCalendarNames(conn, ctxt);
2125:                    }
2126:                });
2127:            }
2128:
2129:            protected String[] getCalendarNames(Connection conn,
2130:                    SchedulingContext ctxt) throws JobPersistenceException {
2131:                try {
2132:                    return getDelegate().selectCalendars(conn);
2133:                } catch (SQLException e) {
2134:                    throw new JobPersistenceException(
2135:                            "Couldn't obtain trigger groups: " + e.getMessage(),
2136:                            e);
2137:                }
2138:            }
2139:
2140:            /**
2141:             * <p>
2142:             * Get all of the Triggers that are associated to the given Job.
2143:             * </p>
2144:             * 
2145:             * <p>
2146:             * If there are no matches, a zero-length array should be returned.
2147:             * </p>
2148:             */
2149:            public Trigger[] getTriggersForJob(final SchedulingContext ctxt,
2150:                    final String jobName, final String groupName)
2151:                    throws JobPersistenceException {
2152:                return (Trigger[]) executeWithoutLock( // no locks necessary for read...
2153:                new TransactionCallback() {
2154:                    public Object execute(Connection conn)
2155:                            throws JobPersistenceException {
2156:                        return getTriggersForJob(conn, ctxt, jobName, groupName);
2157:                    }
2158:                });
2159:            }
2160:
2161:            protected Trigger[] getTriggersForJob(Connection conn,
2162:                    SchedulingContext ctxt, String jobName, String groupName)
2163:                    throws JobPersistenceException {
2164:                Trigger[] array = null;
2165:
2166:                try {
2167:                    array = getDelegate().selectTriggersForJob(conn, jobName,
2168:                            groupName);
2169:                } catch (Exception e) {
2170:                    throw new JobPersistenceException(
2171:                            "Couldn't obtain triggers for job: "
2172:                                    + e.getMessage(), e);
2173:                }
2174:
2175:                return array;
2176:            }
2177:
2178:            /**
2179:             * <p>
2180:             * Pause the <code>{@link org.quartz.Trigger}</code> with the given name.
2181:             * </p>
2182:             * 
2183:             * @see #resumeTrigger(SchedulingContext, String, String)
2184:             */
2185:            public void pauseTrigger(final SchedulingContext ctxt,
2186:                    final String triggerName, final String groupName)
2187:                    throws JobPersistenceException {
2188:                executeInLock(LOCK_TRIGGER_ACCESS,
2189:                        new VoidTransactionCallback() {
2190:                            public void execute(Connection conn)
2191:                                    throws JobPersistenceException {
2192:                                pauseTrigger(conn, ctxt, triggerName, groupName);
2193:                            }
2194:                        });
2195:            }
2196:
2197:            /**
2198:             * <p>
2199:             * Pause the <code>{@link org.quartz.Trigger}</code> with the given name.
2200:             * </p>
2201:             * 
2202:             * @see #resumeTrigger(Connection, SchedulingContext, String, String)
2203:             */
2204:            public void pauseTrigger(Connection conn, SchedulingContext ctxt,
2205:                    String triggerName, String groupName)
2206:                    throws JobPersistenceException {
2207:
2208:                try {
2209:                    String oldState = getDelegate().selectTriggerState(conn,
2210:                            triggerName, groupName);
2211:
2212:                    if (oldState.equals(STATE_WAITING)
2213:                            || oldState.equals(STATE_ACQUIRED)) {
2214:
2215:                        getDelegate().updateTriggerState(conn, triggerName,
2216:                                groupName, STATE_PAUSED);
2217:                    } else if (oldState.equals(STATE_BLOCKED)) {
2218:                        getDelegate().updateTriggerState(conn, triggerName,
2219:                                groupName, STATE_PAUSED_BLOCKED);
2220:                    }
2221:                } catch (SQLException e) {
2222:                    throw new JobPersistenceException(
2223:                            "Couldn't pause trigger '" + groupName + "."
2224:                                    + triggerName + "': " + e.getMessage(), e);
2225:                }
2226:            }
2227:
2228:            /**
2229:             * <p>
2230:             * Pause the <code>{@link org.quartz.Job}</code> with the given name - by
2231:             * pausing all of its current <code>Trigger</code>s.
2232:             * </p>
2233:             * 
2234:             * @see #resumeJob(SchedulingContext, String, String)
2235:             */
2236:            public void pauseJob(final SchedulingContext ctxt,
2237:                    final String jobName, final String groupName)
2238:                    throws JobPersistenceException {
2239:                executeInLock(LOCK_TRIGGER_ACCESS,
2240:                        new VoidTransactionCallback() {
2241:                            public void execute(Connection conn)
2242:                                    throws JobPersistenceException {
2243:                                Trigger[] triggers = getTriggersForJob(conn,
2244:                                        ctxt, jobName, groupName);
2245:                                for (int j = 0; j < triggers.length; j++) {
2246:                                    pauseTrigger(conn, ctxt, triggers[j]
2247:                                            .getName(), triggers[j].getGroup());
2248:                                }
2249:                            }
2250:                        });
2251:            }
2252:
2253:            /**
2254:             * <p>
2255:             * Pause all of the <code>{@link org.quartz.Job}s</code> in the given
2256:             * group - by pausing all of their <code>Trigger</code>s.
2257:             * </p>
2258:             * 
2259:             * @see #resumeJobGroup(SchedulingContext, String)
2260:             */
2261:            public void pauseJobGroup(final SchedulingContext ctxt,
2262:                    final String groupName) throws JobPersistenceException {
2263:                executeInLock(LOCK_TRIGGER_ACCESS,
2264:                        new VoidTransactionCallback() {
2265:                            public void execute(Connection conn)
2266:                                    throws JobPersistenceException {
2267:                                String[] jobNames = getJobNames(conn, ctxt,
2268:                                        groupName);
2269:
2270:                                for (int i = 0; i < jobNames.length; i++) {
2271:                                    Trigger[] triggers = getTriggersForJob(
2272:                                            conn, ctxt, jobNames[i], groupName);
2273:                                    for (int j = 0; j < triggers.length; j++) {
2274:                                        pauseTrigger(conn, ctxt, triggers[j]
2275:                                                .getName(), triggers[j]
2276:                                                .getGroup());
2277:                                    }
2278:                                }
2279:                            }
2280:                        });
2281:            }
2282:
2283:            /**
2284:             * Determines if a Trigger for the given job should be blocked.  
2285:             * State can only transition to STATE_PAUSED_BLOCKED/STATE_BLOCKED from 
2286:             * STATE_PAUSED/STATE_WAITING respectively.
2287:             * 
2288:             * @return STATE_PAUSED_BLOCKED, STATE_BLOCKED, or the currentState. 
2289:             */
2290:            protected String checkBlockedState(Connection conn,
2291:                    SchedulingContext ctxt, String jobName,
2292:                    String jobGroupName, String currentState)
2293:                    throws JobPersistenceException {
2294:
2295:                // State can only transition to BLOCKED from PAUSED or WAITING.
2296:                if ((currentState.equals(STATE_WAITING) == false)
2297:                        && (currentState.equals(STATE_PAUSED) == false)) {
2298:                    return currentState;
2299:                }
2300:
2301:                try {
2302:                    List lst = getDelegate().selectFiredTriggerRecordsByJob(
2303:                            conn, jobName, jobGroupName);
2304:
2305:                    if (lst.size() > 0) {
2306:                        FiredTriggerRecord rec = (FiredTriggerRecord) lst
2307:                                .get(0);
2308:                        if (rec.isJobIsStateful()) { // TODO: worry about
2309:                            // failed/recovering/volatile job
2310:                            // states?
2311:                            return (STATE_PAUSED.equals(currentState)) ? STATE_PAUSED_BLOCKED
2312:                                    : STATE_BLOCKED;
2313:                        }
2314:                    }
2315:
2316:                    return currentState;
2317:                } catch (SQLException e) {
2318:                    throw new JobPersistenceException(
2319:                            "Couldn't determine if trigger should be in a blocked state '"
2320:                                    + jobGroupName + "." + jobName + "': "
2321:                                    + e.getMessage(), e);
2322:                }
2323:
2324:            }
2325:
2326:            /*
2327:             * private List findTriggersToBeBlocked(Connection conn, SchedulingContext
2328:             * ctxt, String groupName) throws JobPersistenceException {
2329:             * 
2330:             * try { List blockList = new LinkedList();
2331:             * 
2332:             * List affectingJobs =
2333:             * getDelegate().selectStatefulJobsOfTriggerGroup(conn, groupName);
2334:             * 
2335:             * Iterator itr = affectingJobs.iterator(); while(itr.hasNext()) { Key
2336:             * jobKey = (Key) itr.next();
2337:             * 
2338:             * List lst = getDelegate().selectFiredTriggerRecordsByJob(conn,
2339:             * jobKey.getName(), jobKey.getGroup());
2340:             * 
2341:             * This logic is BROKEN...
2342:             * 
2343:             * if(lst.size() > 0) { FiredTriggerRecord rec =
2344:             * (FiredTriggerRecord)lst.get(0); if(rec.isJobIsStateful()) // TODO: worry
2345:             * about failed/recovering/volatile job states? blockList.add(
2346:             * rec.getTriggerKey() ); } }
2347:             * 
2348:             * 
2349:             * return blockList; } catch (SQLException e) { throw new
2350:             * JobPersistenceException ("Couldn't determine states of resumed triggers
2351:             * in group '" + groupName + "': " + e.getMessage(), e); } }
2352:             */
2353:
2354:            /**
2355:             * <p>
2356:             * Resume (un-pause) the <code>{@link org.quartz.Trigger}</code> with the
2357:             * given name.
2358:             * </p>
2359:             * 
2360:             * <p>
2361:             * If the <code>Trigger</code> missed one or more fire-times, then the
2362:             * <code>Trigger</code>'s misfire instruction will be applied.
2363:             * </p>
2364:             * 
2365:             * @see #pauseTrigger(SchedulingContext, String, String)
2366:             */
2367:            public void resumeTrigger(final SchedulingContext ctxt,
2368:                    final String triggerName, final String groupName)
2369:                    throws JobPersistenceException {
2370:                executeInLock(LOCK_TRIGGER_ACCESS,
2371:                        new VoidTransactionCallback() {
2372:                            public void execute(Connection conn)
2373:                                    throws JobPersistenceException {
2374:                                resumeTrigger(conn, ctxt, triggerName,
2375:                                        groupName);
2376:                            }
2377:                        });
2378:            }
2379:
2380:            /**
2381:             * <p>
2382:             * Resume (un-pause) the <code>{@link org.quartz.Trigger}</code> with the
2383:             * given name.
2384:             * </p>
2385:             * 
2386:             * <p>
2387:             * If the <code>Trigger</code> missed one or more fire-times, then the
2388:             * <code>Trigger</code>'s misfire instruction will be applied.
2389:             * </p>
2390:             * 
2391:             * @see #pauseTrigger(Connection, SchedulingContext, String, String)
2392:             */
2393:            public void resumeTrigger(Connection conn, SchedulingContext ctxt,
2394:                    String triggerName, String groupName)
2395:                    throws JobPersistenceException {
2396:                try {
2397:
2398:                    TriggerStatus status = getDelegate().selectTriggerStatus(
2399:                            conn, triggerName, groupName);
2400:
2401:                    if (status == null || status.getNextFireTime() == null) {
2402:                        return;
2403:                    }
2404:
2405:                    boolean blocked = false;
2406:                    if (STATE_PAUSED_BLOCKED.equals(status.getStatus())) {
2407:                        blocked = true;
2408:                    }
2409:
2410:                    String newState = checkBlockedState(conn, ctxt, status
2411:                            .getJobKey().getName(), status.getJobKey()
2412:                            .getGroup(), STATE_WAITING);
2413:
2414:                    boolean misfired = false;
2415:
2416:                    if (status.getNextFireTime().before(new Date())) {
2417:                        misfired = updateMisfiredTrigger(conn, ctxt,
2418:                                triggerName, groupName, newState, true);
2419:                    }
2420:
2421:                    if (!misfired) {
2422:                        if (blocked) {
2423:                            getDelegate().updateTriggerStateFromOtherState(
2424:                                    conn, triggerName, groupName, newState,
2425:                                    STATE_PAUSED_BLOCKED);
2426:                        } else {
2427:                            getDelegate().updateTriggerStateFromOtherState(
2428:                                    conn, triggerName, groupName, newState,
2429:                                    STATE_PAUSED);
2430:                        }
2431:                    }
2432:
2433:                } catch (SQLException e) {
2434:                    throw new JobPersistenceException(
2435:                            "Couldn't resume trigger '" + groupName + "."
2436:                                    + triggerName + "': " + e.getMessage(), e);
2437:                }
2438:            }
2439:
2440:            /**
2441:             * <p>
2442:             * Resume (un-pause) the <code>{@link org.quartz.Job}</code> with the
2443:             * given name.
2444:             * </p>
2445:             * 
2446:             * <p>
2447:             * If any of the <code>Job</code>'s<code>Trigger</code> s missed one
2448:             * or more fire-times, then the <code>Trigger</code>'s misfire
2449:             * instruction will be applied.
2450:             * </p>
2451:             * 
2452:             * @see #pauseJob(SchedulingContext, String, String)
2453:             */
2454:            public void resumeJob(final SchedulingContext ctxt,
2455:                    final String jobName, final String groupName)
2456:                    throws JobPersistenceException {
2457:                executeInLock(LOCK_TRIGGER_ACCESS,
2458:                        new VoidTransactionCallback() {
2459:                            public void execute(Connection conn)
2460:                                    throws JobPersistenceException {
2461:                                Trigger[] triggers = getTriggersForJob(conn,
2462:                                        ctxt, jobName, groupName);
2463:                                for (int j = 0; j < triggers.length; j++) {
2464:                                    resumeTrigger(conn, ctxt, triggers[j]
2465:                                            .getName(), triggers[j].getGroup());
2466:                                }
2467:                            }
2468:                        });
2469:            }
2470:
2471:            /**
2472:             * <p>
2473:             * Resume (un-pause) all of the <code>{@link org.quartz.Job}s</code> in
2474:             * the given group.
2475:             * </p>
2476:             * 
2477:             * <p>
2478:             * If any of the <code>Job</code> s had <code>Trigger</code> s that
2479:             * missed one or more fire-times, then the <code>Trigger</code>'s
2480:             * misfire instruction will be applied.
2481:             * </p>
2482:             * 
2483:             * @see #pauseJobGroup(SchedulingContext, String)
2484:             */
2485:            public void resumeJobGroup(final SchedulingContext ctxt,
2486:                    final String groupName) throws JobPersistenceException {
2487:                executeInLock(LOCK_TRIGGER_ACCESS,
2488:                        new VoidTransactionCallback() {
2489:                            public void execute(Connection conn)
2490:                                    throws JobPersistenceException {
2491:                                String[] jobNames = getJobNames(conn, ctxt,
2492:                                        groupName);
2493:
2494:                                for (int i = 0; i < jobNames.length; i++) {
2495:                                    Trigger[] triggers = getTriggersForJob(
2496:                                            conn, ctxt, jobNames[i], groupName);
2497:                                    for (int j = 0; j < triggers.length; j++) {
2498:                                        resumeTrigger(conn, ctxt, triggers[j]
2499:                                                .getName(), triggers[j]
2500:                                                .getGroup());
2501:                                    }
2502:                                }
2503:                            }
2504:                        });
2505:            }
2506:
2507:            /**
2508:             * <p>
2509:             * Pause all of the <code>{@link org.quartz.Trigger}s</code> in the
2510:             * given group.
2511:             * </p>
2512:             * 
2513:             * @see #resumeTriggerGroup(SchedulingContext, String)
2514:             */
2515:            public void pauseTriggerGroup(final SchedulingContext ctxt,
2516:                    final String groupName) throws JobPersistenceException {
2517:                executeInLock(LOCK_TRIGGER_ACCESS,
2518:                        new VoidTransactionCallback() {
2519:                            public void execute(Connection conn)
2520:                                    throws JobPersistenceException {
2521:                                pauseTriggerGroup(conn, ctxt, groupName);
2522:                            }
2523:                        });
2524:            }
2525:
2526:            /**
2527:             * <p>
2528:             * Pause all of the <code>{@link org.quartz.Trigger}s</code> in the
2529:             * given group.
2530:             * </p>
2531:             * 
2532:             * @see #resumeTriggerGroup(Connection, SchedulingContext, String)
2533:             */
2534:            public void pauseTriggerGroup(Connection conn,
2535:                    SchedulingContext ctxt, String groupName)
2536:                    throws JobPersistenceException {
2537:
2538:                try {
2539:
2540:                    getDelegate().updateTriggerGroupStateFromOtherStates(conn,
2541:                            groupName, STATE_PAUSED, STATE_ACQUIRED,
2542:                            STATE_WAITING, STATE_WAITING);
2543:
2544:                    getDelegate().updateTriggerGroupStateFromOtherState(conn,
2545:                            groupName, STATE_PAUSED_BLOCKED, STATE_BLOCKED);
2546:
2547:                    if (!getDelegate().isTriggerGroupPaused(conn, groupName)) {
2548:                        getDelegate().insertPausedTriggerGroup(conn, groupName);
2549:                    }
2550:
2551:                } catch (SQLException e) {
2552:                    throw new JobPersistenceException(
2553:                            "Couldn't pause trigger group '" + groupName
2554:                                    + "': " + e.getMessage(), e);
2555:                }
2556:            }
2557:
2558:            public Set getPausedTriggerGroups(final SchedulingContext ctxt)
2559:                    throws JobPersistenceException {
2560:                return (Set) executeWithoutLock( // no locks necessary for read...
2561:                new TransactionCallback() {
2562:                    public Object execute(Connection conn)
2563:                            throws JobPersistenceException {
2564:                        return getPausedTriggerGroups(conn, ctxt);
2565:                    }
2566:                });
2567:            }
2568:
2569:            /**
2570:             * <p>
2571:             * Pause all of the <code>{@link org.quartz.Trigger}s</code> in the
2572:             * given group.
2573:             * </p>
2574:             * 
2575:             * @see #resumeTriggerGroup(Connection, SchedulingContext, String)
2576:             */
2577:            public Set getPausedTriggerGroups(Connection conn,
2578:                    SchedulingContext ctxt) throws JobPersistenceException {
2579:
2580:                try {
2581:                    return getDelegate().selectPausedTriggerGroups(conn);
2582:                } catch (SQLException e) {
2583:                    throw new JobPersistenceException(
2584:                            "Couldn't determine paused trigger groups: "
2585:                                    + e.getMessage(), e);
2586:                }
2587:            }
2588:
2589:            /**
2590:             * <p>
2591:             * Resume (un-pause) all of the <code>{@link org.quartz.Trigger}s</code>
2592:             * in the given group.
2593:             * </p>
2594:             * 
2595:             * <p>
2596:             * If any <code>Trigger</code> missed one or more fire-times, then the
2597:             * <code>Trigger</code>'s misfire instruction will be applied.
2598:             * </p>
2599:             * 
2600:             * @see #pauseTriggerGroup(SchedulingContext, String)
2601:             */
2602:            public void resumeTriggerGroup(final SchedulingContext ctxt,
2603:                    final String groupName) throws JobPersistenceException {
2604:                executeInLock(LOCK_TRIGGER_ACCESS,
2605:                        new VoidTransactionCallback() {
2606:                            public void execute(Connection conn)
2607:                                    throws JobPersistenceException {
2608:                                resumeTriggerGroup(conn, ctxt, groupName);
2609:                            }
2610:                        });
2611:            }
2612:
2613:            /**
2614:             * <p>
2615:             * Resume (un-pause) all of the <code>{@link org.quartz.Trigger}s</code>
2616:             * in the given group.
2617:             * </p>
2618:             * 
2619:             * <p>
2620:             * If any <code>Trigger</code> missed one or more fire-times, then the
2621:             * <code>Trigger</code>'s misfire instruction will be applied.
2622:             * </p>
2623:             * 
2624:             * @see #pauseTriggerGroup(Connection, SchedulingContext, String)
2625:             */
2626:            public void resumeTriggerGroup(Connection conn,
2627:                    SchedulingContext ctxt, String groupName)
2628:                    throws JobPersistenceException {
2629:
2630:                try {
2631:
2632:                    getDelegate().deletePausedTriggerGroup(conn, groupName);
2633:
2634:                    String[] trigNames = getDelegate().selectTriggersInGroup(
2635:                            conn, groupName);
2636:
2637:                    for (int i = 0; i < trigNames.length; i++) {
2638:                        resumeTrigger(conn, ctxt, trigNames[i], groupName);
2639:                    }
2640:
2641:                    // TODO: find an efficient way to resume triggers (better than the
2642:                    // above)... logic below is broken because of
2643:                    // findTriggersToBeBlocked()
2644:                    /*
2645:                     * int res =
2646:                     * getDelegate().updateTriggerGroupStateFromOtherState(conn,
2647:                     * groupName, STATE_WAITING, STATE_PAUSED);
2648:                     * 
2649:                     * if(res > 0) {
2650:                     * 
2651:                     * long misfireTime = System.currentTimeMillis();
2652:                     * if(getMisfireThreshold() > 0) misfireTime -=
2653:                     * getMisfireThreshold();
2654:                     * 
2655:                     * Key[] misfires =
2656:                     * getDelegate().selectMisfiredTriggersInGroupInState(conn,
2657:                     * groupName, STATE_WAITING, misfireTime);
2658:                     * 
2659:                     * List blockedTriggers = findTriggersToBeBlocked(conn, ctxt,
2660:                     * groupName);
2661:                     * 
2662:                     * Iterator itr = blockedTriggers.iterator(); while(itr.hasNext()) {
2663:                     * Key key = (Key)itr.next();
2664:                     * getDelegate().updateTriggerState(conn, key.getName(),
2665:                     * key.getGroup(), STATE_BLOCKED); }
2666:                     * 
2667:                     * for(int i=0; i < misfires.length; i++) {               String
2668:                     * newState = STATE_WAITING;
2669:                     * if(blockedTriggers.contains(misfires[i])) newState =
2670:                     * STATE_BLOCKED; updateMisfiredTrigger(conn, ctxt,
2671:                     * misfires[i].getName(), misfires[i].getGroup(), newState, true); } }
2672:                     */
2673:
2674:                } catch (SQLException e) {
2675:                    throw new JobPersistenceException(
2676:                            "Couldn't pause trigger group '" + groupName
2677:                                    + "': " + e.getMessage(), e);
2678:                }
2679:            }
2680:
2681:            /**
2682:             * <p>
2683:             * Pause all triggers - equivalent of calling <code>pauseTriggerGroup(group)</code>
2684:             * on every group.
2685:             * </p>
2686:             * 
2687:             * <p>
2688:             * When <code>resumeAll()</code> is called (to un-pause), trigger misfire
2689:             * instructions WILL be applied.
2690:             * </p>
2691:             * 
2692:             * @see #resumeAll(SchedulingContext)
2693:             * @see #pauseTriggerGroup(SchedulingContext, String)
2694:             */
2695:            public void pauseAll(final SchedulingContext ctxt)
2696:                    throws JobPersistenceException {
2697:                executeInLock(LOCK_TRIGGER_ACCESS,
2698:                        new VoidTransactionCallback() {
2699:                            public void execute(Connection conn)
2700:                                    throws JobPersistenceException {
2701:                                pauseAll(conn, ctxt);
2702:                            }
2703:                        });
2704:            }
2705:
2706:            /**
2707:             * <p>
2708:             * Pause all triggers - equivalent of calling <code>pauseTriggerGroup(group)</code>
2709:             * on every group.
2710:             * </p>
2711:             * 
2712:             * <p>
2713:             * When <code>resumeAll()</code> is called (to un-pause), trigger misfire
2714:             * instructions WILL be applied.
2715:             * </p>
2716:             * 
2717:             * @see #resumeAll(SchedulingContext)
2718:             * @see #pauseTriggerGroup(SchedulingContext, String)
2719:             */
2720:            public void pauseAll(Connection conn, SchedulingContext ctxt)
2721:                    throws JobPersistenceException {
2722:
2723:                String[] names = getTriggerGroupNames(conn, ctxt);
2724:
2725:                for (int i = 0; i < names.length; i++) {
2726:                    pauseTriggerGroup(conn, ctxt, names[i]);
2727:                }
2728:
2729:                try {
2730:                    if (!getDelegate().isTriggerGroupPaused(conn,
2731:                            ALL_GROUPS_PAUSED)) {
2732:                        getDelegate().insertPausedTriggerGroup(conn,
2733:                                ALL_GROUPS_PAUSED);
2734:                    }
2735:
2736:                } catch (SQLException e) {
2737:                    throw new JobPersistenceException(
2738:                            "Couldn't pause all trigger groups: "
2739:                                    + e.getMessage(), e);
2740:                }
2741:
2742:            }
2743:
2744:            /**
2745:             * <p>
2746:             * Resume (un-pause) all triggers - equivalent of calling <code>resumeTriggerGroup(group)</code>
2747:             * on every group.
2748:             * </p>
2749:             * 
2750:             * <p>
2751:             * If any <code>Trigger</code> missed one or more fire-times, then the
2752:             * <code>Trigger</code>'s misfire instruction will be applied.
2753:             * </p>
2754:             * 
2755:             * @see #pauseAll(SchedulingContext)
2756:             */
2757:            public void resumeAll(final SchedulingContext ctxt)
2758:                    throws JobPersistenceException {
2759:                executeInLock(LOCK_TRIGGER_ACCESS,
2760:                        new VoidTransactionCallback() {
2761:                            public void execute(Connection conn)
2762:                                    throws JobPersistenceException {
2763:                                resumeAll(conn, ctxt);
2764:                            }
2765:                        });
2766:            }
2767:
2768:            /**
2769:             * protected
2770:             * <p>
2771:             * Resume (un-pause) all triggers - equivalent of calling <code>resumeTriggerGroup(group)</code>
2772:             * on every group.
2773:             * </p>
2774:             * 
2775:             * <p>
2776:             * If any <code>Trigger</code> missed one or more fire-times, then the
2777:             * <code>Trigger</code>'s misfire instruction will be applied.
2778:             * </p>
2779:             * 
2780:             * @see #pauseAll(SchedulingContext)
2781:             */
2782:            public void resumeAll(Connection conn, SchedulingContext ctxt)
2783:                    throws JobPersistenceException {
2784:
2785:                String[] names = getTriggerGroupNames(conn, ctxt);
2786:
2787:                for (int i = 0; i < names.length; i++) {
2788:                    resumeTriggerGroup(conn, ctxt, names[i]);
2789:                }
2790:
2791:                try {
2792:                    getDelegate().deletePausedTriggerGroup(conn,
2793:                            ALL_GROUPS_PAUSED);
2794:                } catch (SQLException e) {
2795:                    throw new JobPersistenceException(
2796:                            "Couldn't resume all trigger groups: "
2797:                                    + e.getMessage(), e);
2798:                }
2799:            }
2800:
2801:            private static long ftrCtr = System.currentTimeMillis();
2802:
2803:            protected synchronized String getFiredTriggerRecordId() {
2804:                return getInstanceId() + ftrCtr++;
2805:            }
2806:
2807:            /**
2808:             * <p>
2809:             * Get a handle to the next N triggers to be fired, and mark them as 'reserved'
2810:             * by the calling scheduler.
2811:             * </p>
2812:             * 
2813:             * @see #releaseAcquiredTrigger(SchedulingContext, Trigger)
2814:             */
2815:            public Trigger acquireNextTrigger(final SchedulingContext ctxt,
2816:                    final long noLaterThan) throws JobPersistenceException {
2817:                return (Trigger) executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
2818:                        new TransactionCallback() {
2819:                            public Object execute(Connection conn)
2820:                                    throws JobPersistenceException {
2821:                                return acquireNextTrigger(conn, ctxt,
2822:                                        noLaterThan);
2823:                            }
2824:                        });
2825:            }
2826:
2827:            // TODO: this really ought to return something like a FiredTriggerBundle,
2828:            // so that the fireInstanceId doesn't have to be on the trigger...
2829:            protected Trigger acquireNextTrigger(Connection conn,
2830:                    SchedulingContext ctxt, long noLaterThan)
2831:                    throws JobPersistenceException {
2832:                do {
2833:                    try {
2834:                        Key triggerKey = getDelegate().selectTriggerToAcquire(
2835:                                conn, noLaterThan, getMisfireTime());
2836:
2837:                        // No trigger is ready to fire yet.
2838:                        if (triggerKey == null) {
2839:                            return null;
2840:                        }
2841:
2842:                        int rowsUpdated = getDelegate()
2843:                                .updateTriggerStateFromOtherState(conn,
2844:                                        triggerKey.getName(),
2845:                                        triggerKey.getGroup(), STATE_ACQUIRED,
2846:                                        STATE_WAITING);
2847:
2848:                        // If our trigger was no longer in the expected state, try a new one.
2849:                        if (rowsUpdated <= 0) {
2850:                            continue;
2851:                        }
2852:
2853:                        Trigger nextTrigger = retrieveTrigger(conn, ctxt,
2854:                                triggerKey.getName(), triggerKey.getGroup());
2855:
2856:                        // If our trigger is no longer available, try a new one.
2857:                        if (nextTrigger == null) {
2858:                            continue;
2859:                        }
2860:
2861:                        nextTrigger
2862:                                .setFireInstanceId(getFiredTriggerRecordId());
2863:                        getDelegate().insertFiredTrigger(conn, nextTrigger,
2864:                                STATE_ACQUIRED, null);
2865:
2866:                        return nextTrigger;
2867:                    } catch (Exception e) {
2868:                        throw new JobPersistenceException(
2869:                                "Couldn't acquire next trigger: "
2870:                                        + e.getMessage(), e);
2871:                    }
2872:                } while (true);
2873:            }
2874:
2875:            /**
2876:             * <p>
2877:             * Inform the <code>JobStore</code> that the scheduler no longer plans to
2878:             * fire the given <code>Trigger</code>, that it had previously acquired
2879:             * (reserved).
2880:             * </p>
2881:             */
2882:            public void releaseAcquiredTrigger(final SchedulingContext ctxt,
2883:                    final Trigger trigger) throws JobPersistenceException {
2884:                executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
2885:                        new VoidTransactionCallback() {
2886:                            public void execute(Connection conn)
2887:                                    throws JobPersistenceException {
2888:                                releaseAcquiredTrigger(conn, ctxt, trigger);
2889:                            }
2890:                        });
2891:            }
2892:
2893:            protected void releaseAcquiredTrigger(Connection conn,
2894:                    SchedulingContext ctxt, Trigger trigger)
2895:                    throws JobPersistenceException {
2896:                try {
2897:                    getDelegate().updateTriggerStateFromOtherState(conn,
2898:                            trigger.getName(), trigger.getGroup(),
2899:                            STATE_WAITING, STATE_ACQUIRED);
2900:                    getDelegate().deleteFiredTrigger(conn,
2901:                            trigger.getFireInstanceId());
2902:                } catch (SQLException e) {
2903:                    throw new JobPersistenceException(
2904:                            "Couldn't release acquired trigger: "
2905:                                    + e.getMessage(), e);
2906:                }
2907:            }
2908:
2909:            /**
2910:             * <p>
2911:             * Inform the <code>JobStore</code> that the scheduler is now firing the
2912:             * given <code>Trigger</code> (executing its associated <code>Job</code>),
2913:             * that it had previously acquired (reserved).
2914:             * </p>
2915:             * 
2916:             * @return null if the trigger or its job or calendar no longer exist, or
2917:             *         if the trigger was not successfully put into the 'executing'
2918:             *         state.
2919:             */
2920:            public TriggerFiredBundle triggerFired(
2921:                    final SchedulingContext ctxt, final Trigger trigger)
2922:                    throws JobPersistenceException {
2923:                return (TriggerFiredBundle) executeInNonManagedTXLock(
2924:                        LOCK_TRIGGER_ACCESS, new TransactionCallback() {
2925:                            public Object execute(Connection conn)
2926:                                    throws JobPersistenceException {
2927:                                try {
2928:                                    return triggerFired(conn, ctxt, trigger);
2929:                                } catch (JobPersistenceException jpe) {
2930:                                    // If job didn't exisit, we still want to commit our work and return null.
2931:                                    if (jpe.getErrorCode() == SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST) {
2932:                                        return null;
2933:                                    } else {
2934:                                        throw jpe;
2935:                                    }
2936:                                }
2937:                            }
2938:                        });
2939:            }
2940:
2941:            protected TriggerFiredBundle triggerFired(Connection conn,
2942:                    SchedulingContext ctxt, Trigger trigger)
2943:                    throws JobPersistenceException {
2944:                JobDetail job = null;
2945:                Calendar cal = null;
2946:
2947:                // Make sure trigger wasn't deleted, paused, or completed...
2948:                try { // if trigger was deleted, state will be STATE_DELETED
2949:                    String state = getDelegate().selectTriggerState(conn,
2950:                            trigger.getName(), trigger.getGroup());
2951:                    if (!state.equals(STATE_ACQUIRED)) {
2952:                        return null;
2953:                    }
2954:                } catch (SQLException e) {
2955:                    throw new JobPersistenceException(
2956:                            "Couldn't select trigger state: " + e.getMessage(),
2957:                            e);
2958:                }
2959:
2960:                try {
2961:                    job = retrieveJob(conn, ctxt, trigger.getJobName(), trigger
2962:                            .getJobGroup());
2963:                    if (job == null) {
2964:                        return null;
2965:                    }
2966:                } catch (JobPersistenceException jpe) {
2967:                    try {
2968:                        getLog()
2969:                                .error(
2970:                                        "Error retrieving job, setting trigger state to ERROR.",
2971:                                        jpe);
2972:                        getDelegate().updateTriggerState(conn,
2973:                                trigger.getName(), trigger.getGroup(),
2974:                                STATE_ERROR);
2975:                    } catch (SQLException sqle) {
2976:                        getLog().error("Unable to set trigger state to ERROR.",
2977:                                sqle);
2978:                    }
2979:                    throw jpe;
2980:                }
2981:
2982:                if (trigger.getCalendarName() != null) {
2983:                    cal = retrieveCalendar(conn, ctxt, trigger
2984:                            .getCalendarName());
2985:                    if (cal == null) {
2986:                        return null;
2987:                    }
2988:                }
2989:
2990:                try {
2991:                    getDelegate().deleteFiredTrigger(conn,
2992:                            trigger.getFireInstanceId());
2993:                    getDelegate().insertFiredTrigger(conn, trigger,
2994:                            STATE_EXECUTING, job);
2995:                } catch (SQLException e) {
2996:                    throw new JobPersistenceException(
2997:                            "Couldn't insert fired trigger: " + e.getMessage(),
2998:                            e);
2999:                }
3000:
3001:                Date prevFireTime = trigger.getPreviousFireTime();
3002:
3003:                // call triggered - to update the trigger's next-fire-time state...
3004:                trigger.triggered(cal);
3005:
3006:                String state = STATE_WAITING;
3007:                boolean force = true;
3008:
3009:                if (job.isStateful()) {
3010:                    state = STATE_BLOCKED;
3011:                    force = false;
3012:                    try {
3013:                        getDelegate().updateTriggerStatesForJobFromOtherState(
3014:                                conn, job.getName(), job.getGroup(),
3015:                                STATE_BLOCKED, STATE_WAITING);
3016:                        getDelegate().updateTriggerStatesForJobFromOtherState(
3017:                                conn, job.getName(), job.getGroup(),
3018:                                STATE_BLOCKED, STATE_ACQUIRED);
3019:                        getDelegate().updateTriggerStatesForJobFromOtherState(
3020:                                conn, job.getName(), job.getGroup(),
3021:                                STATE_PAUSED_BLOCKED, STATE_PAUSED);
3022:                    } catch (SQLException e) {
3023:                        throw new JobPersistenceException(
3024:                                "Couldn't update states of blocked triggers: "
3025:                                        + e.getMessage(), e);
3026:                    }
3027:                }
3028:
3029:                if (trigger.getNextFireTime() == null) {
3030:                    state = STATE_COMPLETE;
3031:                    force = true;
3032:                }
3033:
3034:                storeTrigger(conn, ctxt, trigger, job, true, state, force,
3035:                        false);
3036:
3037:                job.getJobDataMap().clearDirtyFlag();
3038:
3039:                return new TriggerFiredBundle(job, trigger, cal, trigger
3040:                        .getGroup().equals(Scheduler.DEFAULT_RECOVERY_GROUP),
3041:                        new Date(), trigger.getPreviousFireTime(),
3042:                        prevFireTime, trigger.getNextFireTime());
3043:            }
3044:
3045:            /**
3046:             * <p>
3047:             * Inform the <code>JobStore</code> that the scheduler has completed the
3048:             * firing of the given <code>Trigger</code> (and the execution its
3049:             * associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code>
3050:             * in the given <code>JobDetail</code> should be updated if the <code>Job</code>
3051:             * is stateful.
3052:             * </p>
3053:             */
3054:            public void triggeredJobComplete(final SchedulingContext ctxt,
3055:                    final Trigger trigger, final JobDetail jobDetail,
3056:                    final int triggerInstCode) throws JobPersistenceException {
3057:                executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
3058:                        new VoidTransactionCallback() {
3059:                            public void execute(Connection conn)
3060:                                    throws JobPersistenceException {
3061:                                triggeredJobComplete(conn, ctxt, trigger,
3062:                                        jobDetail, triggerInstCode);
3063:                            }
3064:                        });
3065:            }
3066:
3067:            protected void triggeredJobComplete(Connection conn,
3068:                    SchedulingContext ctxt, Trigger trigger,
3069:                    JobDetail jobDetail, int triggerInstCode)
3070:                    throws JobPersistenceException {
3071:                try {
3072:                    if (triggerInstCode == Trigger.INSTRUCTION_DELETE_TRIGGER) {
3073:                        if (trigger.getNextFireTime() == null) {
3074:                            // double check for possible reschedule within job 
3075:                            // execution, which would cancel the need to delete...
3076:                            TriggerStatus stat = getDelegate()
3077:                                    .selectTriggerStatus(conn,
3078:                                            trigger.getName(),
3079:                                            trigger.getGroup());
3080:                            if (stat != null && stat.getNextFireTime() == null) {
3081:                                removeTrigger(conn, ctxt, trigger.getName(),
3082:                                        trigger.getGroup());
3083:                            }
3084:                        } else {
3085:                            removeTrigger(conn, ctxt, trigger.getName(),
3086:                                    trigger.getGroup());
3087:                        }
3088:                    } else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_COMPLETE) {
3089:                        getDelegate().updateTriggerState(conn,
3090:                                trigger.getName(), trigger.getGroup(),
3091:                                STATE_COMPLETE);
3092:                    } else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_ERROR) {
3093:                        getLog().info(
3094:                                "Trigger " + trigger.getFullName()
3095:                                        + " set to ERROR state.");
3096:                        getDelegate().updateTriggerState(conn,
3097:                                trigger.getName(), trigger.getGroup(),
3098:                                STATE_ERROR);
3099:                    } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_COMPLETE) {
3100:                        getDelegate().updateTriggerStatesForJob(conn,
3101:                                trigger.getJobName(), trigger.getJobGroup(),
3102:                                STATE_COMPLETE);
3103:                    } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR) {
3104:                        getLog().info(
3105:                                "All triggers of Job "
3106:                                        + trigger.getFullJobName()
3107:                                        + " set to ERROR state.");
3108:                        getDelegate().updateTriggerStatesForJob(conn,
3109:                                trigger.getJobName(), trigger.getJobGroup(),
3110:                                STATE_ERROR);
3111:                    }
3112:
3113:                    if (jobDetail.isStateful()) {
3114:                        getDelegate().updateTriggerStatesForJobFromOtherState(
3115:                                conn, jobDetail.getName(),
3116:                                jobDetail.getGroup(), STATE_WAITING,
3117:                                STATE_BLOCKED);
3118:
3119:                        getDelegate().updateTriggerStatesForJobFromOtherState(
3120:                                conn, jobDetail.getName(),
3121:                                jobDetail.getGroup(), STATE_PAUSED,
3122:                                STATE_PAUSED_BLOCKED);
3123:
3124:                        try {
3125:                            if (jobDetail.getJobDataMap().isDirty()) {
3126:                                getDelegate().updateJobData(conn, jobDetail);
3127:                            }
3128:                        } catch (IOException e) {
3129:                            throw new JobPersistenceException(
3130:                                    "Couldn't serialize job data: "
3131:                                            + e.getMessage(), e);
3132:                        } catch (SQLException e) {
3133:                            throw new JobPersistenceException(
3134:                                    "Couldn't update job data: "
3135:                                            + e.getMessage(), e);
3136:                        }
3137:                    }
3138:                } catch (SQLException e) {
3139:                    throw new JobPersistenceException(
3140:                            "Couldn't update trigger state(s): "
3141:                                    + e.getMessage(), e);
3142:                }
3143:
3144:                try {
3145:                    getDelegate().deleteFiredTrigger(conn,
3146:                            trigger.getFireInstanceId());
3147:                } catch (SQLException e) {
3148:                    throw new JobPersistenceException(
3149:                            "Couldn't delete fired trigger: " + e.getMessage(),
3150:                            e);
3151:                }
3152:            }
3153:
3154:            /**
3155:             * <P>
3156:             * Get the driver delegate for DB operations.
3157:             * </p>
3158:             */
3159:            protected DriverDelegate getDelegate()
3160:                    throws NoSuchDelegateException {
3161:                if (null == delegate) {
3162:                    try {
3163:                        if (delegateClassName != null) {
3164:                            delegateClass = getClassLoadHelper().loadClass(
3165:                                    delegateClassName);
3166:                        }
3167:
3168:                        Constructor ctor = null;
3169:                        Object[] ctorParams = null;
3170:                        if (canUseProperties()) {
3171:                            Class[] ctorParamTypes = new Class[] { Log.class,
3172:                                    String.class, String.class, Boolean.class };
3173:                            ctor = delegateClass.getConstructor(ctorParamTypes);
3174:                            ctorParams = new Object[] { getLog(), tablePrefix,
3175:                                    instanceId, new Boolean(canUseProperties()) };
3176:                        } else {
3177:                            Class[] ctorParamTypes = new Class[] { Log.class,
3178:                                    String.class, String.class };
3179:                            ctor = delegateClass.getConstructor(ctorParamTypes);
3180:                            ctorParams = new Object[] { getLog(), tablePrefix,
3181:                                    instanceId };
3182:                        }
3183:
3184:                        delegate = (DriverDelegate) ctor
3185:                                .newInstance(ctorParams);
3186:                    } catch (NoSuchMethodException e) {
3187:                        throw new NoSuchDelegateException(
3188:                                "Couldn't find delegate constructor: "
3189:                                        + e.getMessage());
3190:                    } catch (InstantiationException e) {
3191:                        throw new NoSuchDelegateException(
3192:                                "Couldn't create delegate: " + e.getMessage());
3193:                    } catch (IllegalAccessException e) {
3194:                        throw new NoSuchDelegateException(
3195:                                "Couldn't create delegate: " + e.getMessage());
3196:                    } catch (InvocationTargetException e) {
3197:                        throw new NoSuchDelegateException(
3198:                                "Couldn't create delegate: " + e.getMessage());
3199:                    } catch (ClassNotFoundException e) {
3200:                        throw new NoSuchDelegateException(
3201:                                "Couldn't load delegate class: "
3202:                                        + e.getMessage());
3203:                    }
3204:                }
3205:
3206:                return delegate;
3207:            }
3208:
3209:            protected Semaphore getLockHandler() {
3210:                return lockHandler;
3211:            }
3212:
3213:            public void setLockHandler(Semaphore lockHandler) {
3214:                this .lockHandler = lockHandler;
3215:            }
3216:
3217:            //---------------------------------------------------------------------------
3218:            // Management methods
3219:            //---------------------------------------------------------------------------
3220:
3221:            protected RecoverMisfiredJobsResult doRecoverMisfires()
3222:                    throws JobPersistenceException {
3223:                boolean transOwner = false;
3224:                Connection conn = getNonManagedTXConnection();
3225:                try {
3226:                    RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
3227:
3228:                    // Before we make the potentially expensive call to acquire the 
3229:                    // trigger lock, peek ahead to see if it is likely we would find
3230:                    // misfired triggers requiring recovery.
3231:                    int misfireCount = (getDoubleCheckLockMisfireHandler()) ? getDelegate()
3232:                            .countMisfiredTriggersInStates(conn,
3233:                                    STATE_MISFIRED, STATE_WAITING,
3234:                                    getMisfireTime())
3235:                            : Integer.MAX_VALUE;
3236:
3237:                    if (misfireCount == 0) {
3238:                        getLog()
3239:                                .debug(
3240:                                        "Found 0 triggers that missed their scheduled fire-time.");
3241:                    } else {
3242:                        transOwner = getLockHandler().obtainLock(conn,
3243:                                LOCK_TRIGGER_ACCESS);
3244:
3245:                        result = recoverMisfiredJobs(conn, false);
3246:                    }
3247:
3248:                    commitConnection(conn);
3249:                    return result;
3250:                } catch (JobPersistenceException e) {
3251:                    rollbackConnection(conn);
3252:                    throw e;
3253:                } catch (SQLException e) {
3254:                    rollbackConnection(conn);
3255:                    throw new JobPersistenceException(
3256:                            "Database error recovering from misfires.", e);
3257:                } catch (RuntimeException e) {
3258:                    rollbackConnection(conn);
3259:                    throw new JobPersistenceException(
3260:                            "Unexpected runtime exception: " + e.getMessage(),
3261:                            e);
3262:                } finally {
3263:                    try {
3264:                        releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
3265:                    } finally {
3266:                        cleanupConnection(conn);
3267:                    }
3268:                }
3269:            }
3270:
3271:            protected void signalSchedulingChange() {
3272:                signaler.signalSchedulingChange();
3273:            }
3274:
3275:            //---------------------------------------------------------------------------
3276:            // Cluster management methods
3277:            //---------------------------------------------------------------------------
3278:
3279:            protected boolean firstCheckIn = true;
3280:
3281:            protected long lastCheckin = System.currentTimeMillis();
3282:
3283:            protected boolean doCheckin() throws JobPersistenceException {
3284:                boolean transOwner = false;
3285:                boolean transStateOwner = false;
3286:                boolean recovered = false;
3287:
3288:                Connection conn = getNonManagedTXConnection();
3289:                try {
3290:                    // Other than the first time, always checkin first to make sure there is 
3291:                    // work to be done before we aquire the lock (since that is expensive, 
3292:                    // and is almost never necessary).  This must be done in a separate
3293:                    // transaction to prevent a deadlock under recovery conditions.
3294:                    List failedRecords = null;
3295:                    if (firstCheckIn == false) {
3296:                        boolean succeeded = false;
3297:                        try {
3298:                            failedRecords = clusterCheckIn(conn);
3299:                            commitConnection(conn);
3300:                            succeeded = true;
3301:                        } catch (JobPersistenceException e) {
3302:                            rollbackConnection(conn);
3303:                            throw e;
3304:                        } finally {
3305:                            // Only cleanup the connection if we failed and are bailing
3306:                            // as we will otherwise continue to use it.
3307:                            if (succeeded == false) {
3308:                                cleanupConnection(conn);
3309:                            }
3310:                        }
3311:                    }
3312:
3313:                    if (firstCheckIn || (failedRecords.size() > 0)) {
3314:                        getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
3315:                        transStateOwner = true;
3316:
3317:                        // Now that we own the lock, make sure we still have work to do. 
3318:                        // The first time through, we also need to make sure we update/create our state record
3319:                        failedRecords = (firstCheckIn) ? clusterCheckIn(conn)
3320:                                : findFailedInstances(conn);
3321:
3322:                        if (failedRecords.size() > 0) {
3323:                            getLockHandler().obtainLock(conn,
3324:                                    LOCK_TRIGGER_ACCESS);
3325:                            //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
3326:                            transOwner = true;
3327:
3328:                            clusterRecover(conn, failedRecords);
3329:                            recovered = true;
3330:                        }
3331:                    }
3332:
3333:                    commitConnection(conn);
3334:                } catch (JobPersistenceException e) {
3335:                    rollbackConnection(conn);
3336:                    throw e;
3337:                } finally {
3338:                    try {
3339:                        releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
3340:                    } finally {
3341:                        try {
3342:                            releaseLock(conn, LOCK_STATE_ACCESS,
3343:                                    transStateOwner);
3344:                        } finally {
3345:                            cleanupConnection(conn);
3346:                        }
3347:                    }
3348:                }
3349:
3350:                firstCheckIn = false;
3351:
3352:                return recovered;
3353:            }
3354:
3355:            /**
3356:             * Get a list of all scheduler instances in the cluster that may have failed.
3357:             * This includes this scheduler if it is checking in for the first time.
3358:             */
3359:            protected List findFailedInstances(Connection conn)
3360:                    throws JobPersistenceException {
3361:                try {
3362:                    List failedInstances = new LinkedList();
3363:                    boolean foundThisScheduler = false;
3364:                    long timeNow = System.currentTimeMillis();
3365:
3366:                    List states = getDelegate().selectSchedulerStateRecords(
3367:                            conn, null);
3368:
3369:                    for (Iterator itr = states.iterator(); itr.hasNext();) {
3370:                        SchedulerStateRecord rec = (SchedulerStateRecord) itr
3371:                                .next();
3372:
3373:                        // find own record...
3374:                        if (rec.getSchedulerInstanceId()
3375:                                .equals(getInstanceId())) {
3376:                            foundThisScheduler = true;
3377:                            if (firstCheckIn) {
3378:                                failedInstances.add(rec);
3379:                            }
3380:                        } else {
3381:                            // find failed instances...
3382:                            if (calcFailedIfAfter(rec) < timeNow) {
3383:                                failedInstances.add(rec);
3384:                            }
3385:                        }
3386:                    }
3387:
3388:                    // The first time through, also check for orphaned fired triggers.
3389:                    if (firstCheckIn) {
3390:                        failedInstances.addAll(findOrphanedFailedInstances(
3391:                                conn, states));
3392:                    }
3393:
3394:                    // If not the first time but we didn't find our own instance, then
3395:                    // Someone must have done recovery for us.
3396:                    if ((foundThisScheduler == false)
3397:                            && (firstCheckIn == false)) {
3398:                        // TODO: revisit when handle self-failed-out implied (see TODO in clusterCheckIn() below)
3399:                        getLog()
3400:                                .warn(
3401:                                        "This scheduler instance ("
3402:                                                + getInstanceId()
3403:                                                + ") is still "
3404:                                                + "active but was recovered by another instance in the cluster.  "
3405:                                                + "This may cause inconsistent behavior.");
3406:                    }
3407:
3408:                    return failedInstances;
3409:                } catch (Exception e) {
3410:                    lastCheckin = System.currentTimeMillis();
3411:                    throw new JobPersistenceException(
3412:                            "Failure identifying failed instances when checking-in: "
3413:                                    + e.getMessage(), e);
3414:                }
3415:            }
3416:
3417:            /**
3418:             * Create dummy <code>SchedulerStateRecord</code> objects for fired triggers
3419:             * that have no scheduler state record.  Checkin timestamp and interval are
3420:             * left as zero on these dummy <code>SchedulerStateRecord</code> objects.
3421:             * 
3422:             * @param schedulerStateRecords List of all current <code>SchedulerStateRecords</code>
3423:             */
3424:            private List findOrphanedFailedInstances(Connection conn,
3425:                    List schedulerStateRecords) throws SQLException,
3426:                    NoSuchDelegateException {
3427:                List orphanedInstances = new ArrayList();
3428:
3429:                Set allFiredTriggerInstanceNames = getDelegate()
3430:                        .selectFiredTriggerInstanceNames(conn);
3431:                if (allFiredTriggerInstanceNames.isEmpty() == false) {
3432:                    for (Iterator schedulerStateIter = schedulerStateRecords
3433:                            .iterator(); schedulerStateIter.hasNext();) {
3434:                        SchedulerStateRecord rec = (SchedulerStateRecord) schedulerStateIter
3435:                                .next();
3436:
3437:                        allFiredTriggerInstanceNames.remove(rec
3438:                                .getSchedulerInstanceId());
3439:                    }
3440:
3441:                    for (Iterator orphanIter = allFiredTriggerInstanceNames
3442:                            .iterator(); orphanIter.hasNext();) {
3443:
3444:                        SchedulerStateRecord orphanedInstance = new SchedulerStateRecord();
3445:                        orphanedInstance
3446:                                .setSchedulerInstanceId((String) orphanIter
3447:                                        .next());
3448:
3449:                        orphanedInstances.add(orphanedInstance);
3450:
3451:                        getLog().warn(
3452:                                "Found orphaned fired triggers for instance: "
3453:                                        + orphanedInstance
3454:                                                .getSchedulerInstanceId());
3455:                    }
3456:                }
3457:
3458:                return orphanedInstances;
3459:            }
3460:
3461:            protected long calcFailedIfAfter(SchedulerStateRecord rec) {
3462:                return rec.getCheckinTimestamp()
3463:                        + Math.max(rec.getCheckinInterval(), (System
3464:                                .currentTimeMillis() - lastCheckin)) + 7500L;
3465:            }
3466:
3467:            protected List clusterCheckIn(Connection conn)
3468:                    throws JobPersistenceException {
3469:
3470:                List failedInstances = findFailedInstances(conn);
3471:
3472:                try {
3473:                    // TODO: handle self-failed-out
3474:
3475:                    // check in...
3476:                    lastCheckin = System.currentTimeMillis();
3477:                    if (getDelegate().updateSchedulerState(conn,
3478:                            getInstanceId(), lastCheckin) == 0) {
3479:                        getDelegate().insertSchedulerState(conn,
3480:                                getInstanceId(), lastCheckin,
3481:                                getClusterCheckinInterval());
3482:                    }
3483:
3484:                } catch (Exception e) {
3485:                    throw new JobPersistenceException(
3486:                            "Failure updating scheduler state when checking-in: "
3487:                                    + e.getMessage(), e);
3488:                }
3489:
3490:                return failedInstances;
3491:            }
3492:
3493:            protected void clusterRecover(Connection conn, List failedInstances)
3494:                    throws JobPersistenceException {
3495:
3496:                if (failedInstances.size() > 0) {
3497:
3498:                    long recoverIds = System.currentTimeMillis();
3499:
3500:                    logWarnIfNonZero(failedInstances.size(),
3501:                            "ClusterManager: detected "
3502:                                    + failedInstances.size()
3503:                                    + " failed or restarted instances.");
3504:                    try {
3505:                        Iterator itr = failedInstances.iterator();
3506:                        while (itr.hasNext()) {
3507:                            SchedulerStateRecord rec = (SchedulerStateRecord) itr
3508:                                    .next();
3509:
3510:                            getLog().info(
3511:                                    "ClusterManager: Scanning for instance \""
3512:                                            + rec.getSchedulerInstanceId()
3513:                                            + "\"'s failed in-progress jobs.");
3514:
3515:                            List firedTriggerRecs = getDelegate()
3516:                                    .selectInstancesFiredTriggerRecords(conn,
3517:                                            rec.getSchedulerInstanceId());
3518:
3519:                            int acquiredCount = 0;
3520:                            int recoveredCount = 0;
3521:                            int otherCount = 0;
3522:
3523:                            Set triggerKeys = new HashSet();
3524:
3525:                            Iterator ftItr = firedTriggerRecs.iterator();
3526:                            while (ftItr.hasNext()) {
3527:                                FiredTriggerRecord ftRec = (FiredTriggerRecord) ftItr
3528:                                        .next();
3529:
3530:                                Key tKey = ftRec.getTriggerKey();
3531:                                Key jKey = ftRec.getJobKey();
3532:
3533:                                triggerKeys.add(tKey);
3534:
3535:                                // release blocked triggers..
3536:                                if (ftRec.getFireInstanceState().equals(
3537:                                        STATE_BLOCKED)) {
3538:                                    getDelegate()
3539:                                            .updateTriggerStatesForJobFromOtherState(
3540:                                                    conn, jKey.getName(),
3541:                                                    jKey.getGroup(),
3542:                                                    STATE_WAITING,
3543:                                                    STATE_BLOCKED);
3544:                                } else if (ftRec.getFireInstanceState().equals(
3545:                                        STATE_PAUSED_BLOCKED)) {
3546:                                    getDelegate()
3547:                                            .updateTriggerStatesForJobFromOtherState(
3548:                                                    conn, jKey.getName(),
3549:                                                    jKey.getGroup(),
3550:                                                    STATE_PAUSED,
3551:                                                    STATE_PAUSED_BLOCKED);
3552:                                }
3553:
3554:                                // release acquired triggers..
3555:                                if (ftRec.getFireInstanceState().equals(
3556:                                        STATE_ACQUIRED)) {
3557:                                    getDelegate()
3558:                                            .updateTriggerStateFromOtherState(
3559:                                                    conn, tKey.getName(),
3560:                                                    tKey.getGroup(),
3561:                                                    STATE_WAITING,
3562:                                                    STATE_ACQUIRED);
3563:                                    acquiredCount++;
3564:                                } else if (ftRec.isJobRequestsRecovery()) {
3565:                                    // handle jobs marked for recovery that were not fully
3566:                                    // executed..
3567:                                    if (jobExists(conn, jKey.getName(), jKey
3568:                                            .getGroup())) {
3569:                                        SimpleTrigger rcvryTrig = new SimpleTrigger(
3570:                                                "recover_"
3571:                                                        + rec
3572:                                                                .getSchedulerInstanceId()
3573:                                                        + "_"
3574:                                                        + String
3575:                                                                .valueOf(recoverIds++),
3576:                                                Scheduler.DEFAULT_RECOVERY_GROUP,
3577:                                                new Date(ftRec
3578:                                                        .getFireTimestamp()));
3579:                                        rcvryTrig.setJobName(jKey.getName());
3580:                                        rcvryTrig.setJobGroup(jKey.getGroup());
3581:                                        rcvryTrig
3582:                                                .setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW);
3583:                                        rcvryTrig.setPriority(ftRec
3584:                                                .getPriority());
3585:                                        JobDataMap jd = getDelegate()
3586:                                                .selectTriggerJobDataMap(conn,
3587:                                                        tKey.getName(),
3588:                                                        tKey.getGroup());
3589:                                        jd
3590:                                                .put(
3591:                                                        Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME,
3592:                                                        tKey.getName());
3593:                                        jd
3594:                                                .put(
3595:                                                        Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP,
3596:                                                        tKey.getGroup());
3597:                                        jd
3598:                                                .put(
3599:                                                        Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS,
3600:                                                        String
3601:                                                                .valueOf(ftRec
3602:                                                                        .getFireTimestamp()));
3603:                                        rcvryTrig.setJobDataMap(jd);
3604:
3605:                                        rcvryTrig.computeFirstFireTime(null);
3606:                                        storeTrigger(conn, null, rcvryTrig,
3607:                                                null, false, STATE_WAITING,
3608:                                                false, true);
3609:                                        recoveredCount++;
3610:                                    } else {
3611:                                        getLog()
3612:                                                .warn(
3613:                                                        "ClusterManager: failed job '"
3614:                                                                + jKey
3615:                                                                + "' no longer exists, cannot schedule recovery.");
3616:                                        otherCount++;
3617:                                    }
3618:                                } else {
3619:                                    otherCount++;
3620:                                }
3621:
3622:                                // free up stateful job's triggers
3623:                                if (ftRec.isJobIsStateful()) {
3624:                                    getDelegate()
3625:                                            .updateTriggerStatesForJobFromOtherState(
3626:                                                    conn, jKey.getName(),
3627:                                                    jKey.getGroup(),
3628:                                                    STATE_WAITING,
3629:                                                    STATE_BLOCKED);
3630:                                    getDelegate()
3631:                                            .updateTriggerStatesForJobFromOtherState(
3632:                                                    conn, jKey.getName(),
3633:                                                    jKey.getGroup(),
3634:                                                    STATE_PAUSED,
3635:                                                    STATE_PAUSED_BLOCKED);
3636:                                }
3637:                            }
3638:
3639:                            getDelegate().deleteFiredTriggers(conn,
3640:                                    rec.getSchedulerInstanceId());
3641:
3642:                            // Check if any of the fired triggers we just deleted were the last fired trigger
3643:                            // records of a COMPLETE trigger.
3644:                            int completeCount = 0;
3645:                            for (Iterator triggerKeyIter = triggerKeys
3646:                                    .iterator(); triggerKeyIter.hasNext();) {
3647:                                Key triggerKey = (Key) triggerKeyIter.next();
3648:
3649:                                if (getDelegate().selectTriggerState(conn,
3650:                                        triggerKey.getName(),
3651:                                        triggerKey.getGroup()).equals(
3652:                                        STATE_COMPLETE)) {
3653:                                    List firedTriggers = getDelegate()
3654:                                            .selectFiredTriggerRecords(conn,
3655:                                                    triggerKey.getName(),
3656:                                                    triggerKey.getGroup());
3657:                                    if (firedTriggers.isEmpty()) {
3658:                                        SchedulingContext schedulingContext = new SchedulingContext();
3659:                                        schedulingContext
3660:                                                .setInstanceId(instanceId);
3661:
3662:                                        if (removeTrigger(conn,
3663:                                                schedulingContext, triggerKey
3664:                                                        .getName(), triggerKey
3665:                                                        .getGroup())) {
3666:                                            completeCount++;
3667:                                        }
3668:                                    }
3669:                                }
3670:                            }
3671:
3672:                            logWarnIfNonZero(acquiredCount,
3673:                                    "ClusterManager: ......Freed "
3674:                                            + acquiredCount
3675:                                            + " acquired trigger(s).");
3676:                            logWarnIfNonZero(completeCount,
3677:                                    "ClusterManager: ......Deleted "
3678:                                            + completeCount
3679:                                            + " complete triggers(s).");
3680:                            logWarnIfNonZero(
3681:                                    recoveredCount,
3682:                                    "ClusterManager: ......Scheduled "
3683:                                            + recoveredCount
3684:                                            + " recoverable job(s) for recovery.");
3685:                            logWarnIfNonZero(otherCount,
3686:                                    "ClusterManager: ......Cleaned-up "
3687:                                            + otherCount
3688:                                            + " other failed job(s).");
3689:
3690:                            if (rec.getSchedulerInstanceId().equals(
3691:                                    getInstanceId()) == false) {
3692:                                getDelegate().deleteSchedulerState(conn,
3693:                                        rec.getSchedulerInstanceId());
3694:                            }
3695:                        }
3696:                    } catch (Exception e) {
3697:                        throw new JobPersistenceException(
3698:                                "Failure recovering jobs: " + e.getMessage(), e);
3699:                    }
3700:                }
3701:            }
3702:
3703:            protected void logWarnIfNonZero(int val, String warning) {
3704:                if (val > 0) {
3705:                    getLog().info(warning);
3706:                } else {
3707:                    getLog().debug(warning);
3708:                }
3709:            }
3710:
3711:            /**
3712:             * <p>
3713:             * Cleanup the given database connection.  This means restoring
3714:             * any modified auto commit or transaction isolation connection
3715:             * attributes, and then closing the underlying connection.
3716:             * </p>
3717:             * 
3718:             * <p>
3719:             * This is separate from closeConnection() because the Spring 
3720:             * integration relies on being able to overload closeConnection() and
3721:             * expects the same connection back that it originally returned
3722:             * from the datasource. 
3723:             * </p>
3724:             * 
3725:             * @see #closeConnection(Connection)
3726:             */
3727:            protected void cleanupConnection(Connection conn) {
3728:                if (conn != null) {
3729:                    if (conn instanceof  Proxy) {
3730:                        Proxy connProxy = (Proxy) conn;
3731:
3732:                        InvocationHandler invocationHandler = Proxy
3733:                                .getInvocationHandler(connProxy);
3734:                        if (invocationHandler instanceof  AttributeRestoringConnectionInvocationHandler) {
3735:                            AttributeRestoringConnectionInvocationHandler connHandler = (AttributeRestoringConnectionInvocationHandler) invocationHandler;
3736:
3737:                            connHandler.restoreOriginalAtributes();
3738:                            closeConnection(connHandler.getWrappedConnection());
3739:                            return;
3740:                        }
3741:                    }
3742:
3743:                    // Wan't a Proxy, or was a Proxy, but wasn't ours.
3744:                    closeConnection(conn);
3745:                }
3746:            }
3747:
3748:            /**
3749:             * Closes the supplied <code>Connection</code>.
3750:             * <p>
3751:             * Ignores a <code>null Connection</code>.  
3752:             * Any exception thrown trying to close the <code>Connection</code> is
3753:             * logged and ignored.  
3754:             * </p>
3755:             * 
3756:             * @param conn The <code>Connection</code> to close (Optional).
3757:             */
3758:            protected void closeConnection(Connection conn) {
3759:                if (conn != null) {
3760:                    try {
3761:                        conn.close();
3762:                    } catch (SQLException e) {
3763:                        getLog().error("Failed to close Connection", e);
3764:                    } catch (Throwable e) {
3765:                        getLog()
3766:                                .error(
3767:                                        "Unexpected exception closing Connection."
3768:                                                + "  This is often due to a Connection being returned after or during shutdown.",
3769:                                        e);
3770:                    }
3771:                }
3772:            }
3773:
3774:            /**
3775:             * Rollback the supplied connection.
3776:             * 
3777:             * <p>  
3778:             * Logs any SQLException it gets trying to rollback, but will not propogate
3779:             * the exception lest it mask the exception that caused the caller to 
3780:             * need to rollback in the first place.
3781:             * </p>
3782:             *
3783:             * @param conn (Optional)
3784:             */
3785:            protected void rollbackConnection(Connection conn) {
3786:                if (conn != null) {
3787:                    try {
3788:                        conn.rollback();
3789:                    } catch (SQLException e) {
3790:                        getLog().error(
3791:                                "Couldn't rollback jdbc connection. "
3792:                                        + e.getMessage(), e);
3793:                    }
3794:                }
3795:            }
3796:
3797:            /**
3798:             * Commit the supplied connection
3799:             *
3800:             * @param conn (Optional)
3801:             * @throws JobPersistenceException thrown if a SQLException occurs when the
3802:             * connection is committed
3803:             */
3804:            protected void commitConnection(Connection conn)
3805:                    throws JobPersistenceException {
3806:
3807:                if (conn != null) {
3808:                    try {
3809:                        conn.commit();
3810:                    } catch (SQLException e) {
3811:                        throw new JobPersistenceException(
3812:                                "Couldn't commit jdbc connection. "
3813:                                        + e.getMessage(), e);
3814:                    }
3815:                }
3816:            }
3817:
3818:            /**
3819:             * Implement this interface to provide the code to execute within
3820:             * the a transaction template.  If no return value is required, execute
3821:             * should just return null.
3822:             * 
3823:             * @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback)
3824:             * @see JobStoreSupport#executeInLock(String, TransactionCallback)
3825:             * @see JobStoreSupport#executeWithoutLock(TransactionCallback)
3826:             */
3827:            protected interface TransactionCallback {
3828:                Object execute(Connection conn) throws JobPersistenceException;
3829:            }
3830:
3831:            /**
3832:             * Implement this interface to provide the code to execute within
3833:             * the a transaction template that has no return value.
3834:             * 
3835:             * @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback)
3836:             */
3837:            protected interface VoidTransactionCallback {
3838:                void execute(Connection conn) throws JobPersistenceException;
3839:            }
3840:
3841:            /**
3842:             * Execute the given callback in a transaction. Depending on the JobStore, 
3843:             * the surrounding transaction may be assumed to be already present 
3844:             * (managed).  
3845:             * 
3846:             * <p>
3847:             * This method just forwards to executeInLock() with a null lockName.
3848:             * </p>
3849:             * 
3850:             * @see #executeInLock(String, TransactionCallback)
3851:             */
3852:            public Object executeWithoutLock(TransactionCallback txCallback)
3853:                    throws JobPersistenceException {
3854:                return executeInLock(null, txCallback);
3855:            }
3856:
3857:            /**
3858:             * Execute the given callback having aquired the given lock.  
3859:             * Depending on the JobStore, the surrounding transaction may be 
3860:             * assumed to be already present (managed).  This version is just a 
3861:             * handy wrapper around executeInLock that doesn't require a return
3862:             * value.
3863:             * 
3864:             * @param lockName The name of the lock to aquire, for example 
3865:             * "TRIGGER_ACCESS".  If null, then no lock is aquired, but the
3866:             * lockCallback is still executed in a transaction. 
3867:             * 
3868:             * @see #executeInLock(String, TransactionCallback)
3869:             */
3870:            protected void executeInLock(final String lockName,
3871:                    final VoidTransactionCallback txCallback)
3872:                    throws JobPersistenceException {
3873:                executeInLock(lockName, new TransactionCallback() {
3874:                    public Object execute(Connection conn)
3875:                            throws JobPersistenceException {
3876:                        txCallback.execute(conn);
3877:                        return null;
3878:                    }
3879:                });
3880:            }
3881:
3882:            /**
3883:             * Execute the given callback having aquired the given lock.  
3884:             * Depending on the JobStore, the surrounding transaction may be 
3885:             * assumed to be already present (managed).
3886:             * 
3887:             * @param lockName The name of the lock to aquire, for example 
3888:             * "TRIGGER_ACCESS".  If null, then no lock is aquired, but the
3889:             * lockCallback is still executed in a transaction. 
3890:             */
3891:            protected abstract Object executeInLock(String lockName,
3892:                    TransactionCallback txCallback)
3893:                    throws JobPersistenceException;
3894:
3895:            /**
3896:             * Execute the given callback having optionally aquired the given lock.
3897:             * This uses the non-managed transaction connection.  This version is just a 
3898:             * handy wrapper around executeInNonManagedTXLock that doesn't require a return
3899:             * value.
3900:             * 
3901:             * @param lockName The name of the lock to aquire, for example 
3902:             * "TRIGGER_ACCESS".  If null, then no lock is aquired, but the
3903:             * lockCallback is still executed in a non-managed transaction. 
3904:             * 
3905:             * @see #executeInNonManagedTXLock(String, TransactionCallback)
3906:             */
3907:            protected void executeInNonManagedTXLock(final String lockName,
3908:                    final VoidTransactionCallback txCallback)
3909:                    throws JobPersistenceException {
3910:                executeInNonManagedTXLock(lockName, new TransactionCallback() {
3911:                    public Object execute(Connection conn)
3912:                            throws JobPersistenceException {
3913:                        txCallback.execute(conn);
3914:                        return null;
3915:                    }
3916:                });
3917:            }
3918:
3919:            /**
3920:             * Execute the given callback having optionally aquired the given lock.
3921:             * This uses the non-managed transaction connection.
3922:             * 
3923:             * @param lockName The name of the lock to aquire, for example 
3924:             * "TRIGGER_ACCESS".  If null, then no lock is aquired, but the
3925:             * lockCallback is still executed in a non-managed transaction. 
3926:             */
3927:            protected Object executeInNonManagedTXLock(String lockName,
3928:                    TransactionCallback txCallback)
3929:                    throws JobPersistenceException {
3930:                boolean transOwner = false;
3931:                Connection conn = null;
3932:                try {
3933:                    if (lockName != null) {
3934:                        // If we aren't using db locks, then delay getting DB connection 
3935:                        // until after aquiring the lock since it isn't needed.
3936:                        if (getLockHandler().requiresConnection()) {
3937:                            conn = getNonManagedTXConnection();
3938:                        }
3939:
3940:                        transOwner = getLockHandler()
3941:                                .obtainLock(conn, lockName);
3942:                    }
3943:
3944:                    if (conn == null) {
3945:                        conn = getNonManagedTXConnection();
3946:                    }
3947:
3948:                    Object result = txCallback.execute(conn);
3949:                    commitConnection(conn);
3950:                    return result;
3951:                } catch (JobPersistenceException e) {
3952:                    rollbackConnection(conn);
3953:                    throw e;
3954:                } catch (RuntimeException e) {
3955:                    rollbackConnection(conn);
3956:                    throw new JobPersistenceException(
3957:                            "Unexpected runtime exception: " + e.getMessage(),
3958:                            e);
3959:                } finally {
3960:                    try {
3961:                        releaseLock(conn, lockName, transOwner);
3962:                    } finally {
3963:                        cleanupConnection(conn);
3964:                    }
3965:                }
3966:            }
3967:
3968:            /////////////////////////////////////////////////////////////////////////////
3969:            //
3970:            // ClusterManager Thread
3971:            //
3972:            /////////////////////////////////////////////////////////////////////////////
3973:
3974:            class ClusterManager extends Thread {
3975:
3976:                private boolean shutdown = false;
3977:
3978:                private int numFails = 0;
3979:
3980:                ClusterManager() {
3981:                    this .setPriority(Thread.NORM_PRIORITY + 2);
3982:                    this .setName("QuartzScheduler_" + instanceName + "-"
3983:                            + instanceId + "_ClusterManager");
3984:                    this .setDaemon(getMakeThreadsDaemons());
3985:                }
3986:
3987:                public void initialize() {
3988:                    this .manage();
3989:                    this .start();
3990:                }
3991:
3992:                public void shutdown() {
3993:                    shutdown = true;
3994:                    this .interrupt();
3995:                }
3996:
3997:                private boolean manage() {
3998:                    boolean res = false;
3999:                    try {
4000:
4001:                        res = doCheckin();
4002:
4003:                        numFails = 0;
4004:                        getLog().debug("ClusterManager: Check-in complete.");
4005:                    } catch (Exception e) {
4006:                        if (numFails % 4 == 0) {
4007:                            getLog().error(
4008:                                    "ClusterManager: Error managing cluster: "
4009:                                            + e.getMessage(), e);
4010:                        }
4011:                        numFails++;
4012:                    }
4013:                    return res;
4014:                }
4015:
4016:                public void run() {
4017:                    while (!shutdown) {
4018:
4019:                        if (!shutdown) {
4020:                            long timeToSleep = getClusterCheckinInterval();
4021:                            long transpiredTime = (System.currentTimeMillis() - lastCheckin);
4022:                            timeToSleep = timeToSleep - transpiredTime;
4023:                            if (timeToSleep <= 0) {
4024:                                timeToSleep = 100L;
4025:                            }
4026:
4027:                            if (numFails > 0) {
4028:                                timeToSleep = Math.max(getDbRetryInterval(),
4029:                                        timeToSleep);
4030:                            }
4031:
4032:                            try {
4033:                                Thread.sleep(timeToSleep);
4034:                            } catch (Exception ignore) {
4035:                            }
4036:                        }
4037:
4038:                        if (!shutdown && this .manage()) {
4039:                            signalSchedulingChange();
4040:                        }
4041:
4042:                    }//while !shutdown
4043:                }
4044:            }
4045:
4046:            /////////////////////////////////////////////////////////////////////////////
4047:            //
4048:            // MisfireHandler Thread
4049:            //
4050:            /////////////////////////////////////////////////////////////////////////////
4051:
4052:            class MisfireHandler extends Thread {
4053:
4054:                private boolean shutdown = false;
4055:
4056:                private int numFails = 0;
4057:
4058:                MisfireHandler() {
4059:                    this .setName("QuartzScheduler_" + instanceName + "-"
4060:                            + instanceId + "_MisfireHandler");
4061:                    this .setDaemon(getMakeThreadsDaemons());
4062:                }
4063:
4064:                public void initialize() {
4065:                    //this.manage();
4066:                    this .start();
4067:                }
4068:
4069:                public void shutdown() {
4070:                    shutdown = true;
4071:                    this .interrupt();
4072:                }
4073:
4074:                private RecoverMisfiredJobsResult manage() {
4075:                    try {
4076:                        getLog().debug(
4077:                                "MisfireHandler: scanning for misfires...");
4078:
4079:                        RecoverMisfiredJobsResult res = doRecoverMisfires();
4080:                        numFails = 0;
4081:                        return res;
4082:                    } catch (Exception e) {
4083:                        if (numFails % 4 == 0) {
4084:                            getLog().error(
4085:                                    "MisfireHandler: Error handling misfires: "
4086:                                            + e.getMessage(), e);
4087:                        }
4088:                        numFails++;
4089:                    }
4090:                    return RecoverMisfiredJobsResult.NO_OP;
4091:                }
4092:
4093:                public void run() {
4094:
4095:                    while (!shutdown) {
4096:
4097:                        long sTime = System.currentTimeMillis();
4098:
4099:                        RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
4100:
4101:                        if (recoverMisfiredJobsResult
4102:                                .getProcessedMisfiredTriggerCount() > 0) {
4103:                            signalSchedulingChange();
4104:                        }
4105:
4106:                        if (!shutdown) {
4107:                            long timeToSleep = 50l; // At least a short pause to help balance threads
4108:                            if (!recoverMisfiredJobsResult
4109:                                    .hasMoreMisfiredTriggers()) {
4110:                                timeToSleep = getMisfireThreshold()
4111:                                        - (System.currentTimeMillis() - sTime);
4112:                                if (timeToSleep <= 0) {
4113:                                    timeToSleep = 50l;
4114:                                }
4115:
4116:                                if (numFails > 0) {
4117:                                    timeToSleep = Math.max(
4118:                                            getDbRetryInterval(), timeToSleep);
4119:                                }
4120:                            }
4121:
4122:                            try {
4123:                                Thread.sleep(timeToSleep);
4124:                            } catch (Exception ignore) {
4125:                            }
4126:                        }//while !shutdown
4127:                    }
4128:                }
4129:            }
4130:        }
4131:
4132:        // EOF
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.