001: /*
002: Copyright (C) 2003 Know Gate S.L. All rights reserved.
003: C/Oņa, 107 1š2 28050 Madrid (Spain)
004:
005: Redistribution and use in source and binary forms, with or without
006: modification, are permitted provided that the following conditions
007: are met:
008:
009: 1. Redistributions of source code must retain the above copyright
010: notice, this list of conditions and the following disclaimer.
011:
012: 2. The end-user documentation included with the redistribution,
013: if any, must include the following acknowledgment:
014: "This product includes software parts from hipergate
015: (http://www.hipergate.org/)."
016: Alternately, this acknowledgment may appear in the software itself,
017: if and wherever such third-party acknowledgments normally appear.
018:
019: 3. The name hipergate must not be used to endorse or promote products
020: derived from this software without prior written permission.
021: Products derived from this software may not be called hipergate,
022: nor may hipergate appear in their name, without prior written
023: permission.
024:
025: This library is distributed in the hope that it will be useful,
026: but WITHOUT ANY WARRANTY; without even the implied warranty of
027: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
028:
029: You should have received a copy of hipergate License with this code;
030: if not, visit http://www.hipergate.org or mail to info@hipergate.org
031: */
032:
033: package com.knowgate.scheduler;
034:
035: import java.io.File;
036: import java.sql.DriverManager;
037: import java.sql.Connection;
038: import java.sql.SQLException;
039: import java.sql.Connection;
040: import java.sql.PreparedStatement;
041: import java.sql.ResultSet;
042: import java.sql.Statement;
043: import java.sql.Types;
044:
045: import java.util.Date;
046: import java.util.Properties;
047: import java.util.LinkedList;
048: import java.util.ListIterator;
049:
050: import java.io.FileInputStream;
051: import java.io.IOException;
052: import java.io.FileNotFoundException;
053:
054: import com.knowgate.debug.DebugFile;
055: import com.knowgate.dataobjs.DBBind;
056: import com.knowgate.jdc.JDCConnection;
057: import com.knowgate.dataobjs.DB;
058:
059: /**
060: * <p>Scheduler daemon</p>
061: * <p>Keeps a thread pool and an atom queue for feeding the pool.</p>
062: * @author Sergio Montoro Ten
063: * @version 3.0
064: */
065:
066: public class SchedulerDaemon extends Thread {
067:
068: private boolean bContinue;
069:
070: private String sProfile;
071:
072: // Worker threads pool
073: private WorkerThreadPool oThreadPool;
074:
075: private DBBind oDbb;
076:
077: // This queue will be an in-memory list
078: // of pending atoms (messages) to send
079: private AtomQueue oQue = new AtomQueue();
080:
081: // Environment properties (typically readed from hipergate.cnf)
082: private Properties oEnvProps;
083:
084: private LinkedList oCallbacks;
085:
086: private Date dtCreationDate;
087: private Date dtStartDate;
088: private Date dtStopDate;
089:
090: // ---------------------------------------------------------------------------
091:
092: private static class SystemOutNotify extends WorkerThreadCallback {
093:
094: public SystemOutNotify() {
095: super ("SystemOutNotify");
096: }
097:
098: public void call(String sThreadId, int iOpCode,
099: String sMessage, Exception oXcpt, Object oParam) {
100:
101: if (WorkerThreadCallback.WT_EXCEPTION == iOpCode)
102: System.out.println("Thread " + sThreadId + ": ERROR "
103: + sMessage);
104: else
105: System.out.println("Thread " + sThreadId + ": "
106: + sMessage);
107: }
108: }
109:
110: // ---------------------------------------------------------------------------
111:
112: /**
113: * <p>Create new SchedulerDaemon</p>
114: * @param sPropertiesFilePath Full path to hipergate.cnf file.<br>
115: * Constructor will read the following properties from hipergate.cnf:<br>
116: * <b>driver</b> JDBC driver class<br>
117: * <b>dburl</b> URL for database connection<br>
118: * <b>dbuser</b> Database User<br>
119: * <b>dbpassword</b> Database User Password<br>
120: * @throws ClassNotFoundException
121: * @throws FileNotFoundException
122: * @throws IOException
123: * @throws SQLException
124: */
125: public SchedulerDaemon(String sPropertiesFilePath)
126: throws ClassNotFoundException, FileNotFoundException,
127: IOException, SQLException {
128:
129: dtStartDate = dtStopDate = null;
130:
131: dtCreationDate = new Date();
132:
133: oThreadPool = null;
134:
135: oDbb = null;
136:
137: bContinue = true;
138:
139: if (DebugFile.trace) {
140: DebugFile.writeln("new FileInputStream("
141: + sPropertiesFilePath + ")");
142: }
143:
144: FileInputStream oInProps = new FileInputStream(
145: sPropertiesFilePath);
146: oEnvProps = new Properties();
147: oEnvProps.load(oInProps);
148: oInProps.close();
149:
150: oCallbacks = new LinkedList();
151:
152: sProfile = sPropertiesFilePath.substring(sPropertiesFilePath
153: .lastIndexOf(File.separator) + 1, sPropertiesFilePath
154: .lastIndexOf('.'));
155:
156: } // SchedulerDaemon
157:
158: // ---------------------------------------------------------------------------
159:
160: /**
161: * Get date when this SchedulerDaemon was created
162: * @return Date
163: */
164: public Date creationDate() {
165: return dtCreationDate;
166: }
167:
168: // ---------------------------------------------------------------------------
169:
170: /**
171: * Get date when this SchedulerDaemon was started for the last time
172: * @return Date
173: */
174: public Date startDate() {
175: return dtStartDate;
176: }
177:
178: // ---------------------------------------------------------------------------
179:
180: /**
181: * Get date when this SchedulerDaemon was stopped for the last time
182: * @return Date
183: */
184: public Date stopDate() {
185: return dtStopDate;
186: }
187:
188: // ---------------------------------------------------------------------------
189:
190: public AtomQueue atomQueue() {
191: return oQue;
192: }
193:
194: // ---------------------------------------------------------------------------
195:
196: public WorkerThreadPool threadPool() {
197: return oThreadPool;
198: }
199:
200: // ---------------------------------------------------------------------------
201:
202: /**
203: * <p>Create AtomQueue and start WorkerThreadPool</p>
204: */
205:
206: public void run() {
207: Statement oStmt;
208: ResultSet oRSet;
209: int iJobCount;
210: String sSQL;
211: AtomConsumer oCsr = null;
212: JDCConnection oCon = null;
213:
214: if (DebugFile.trace)
215: DebugFile.writeln("Begin SchedulerDaemon.run()");
216:
217: try {
218:
219: if (null == oDbb)
220: oDbb = new DBBind(sProfile);
221:
222: oCon = oDbb.getConnection("SchedulerDaemon");
223:
224: if (DebugFile.trace)
225: DebugFile.writeln("JDCConnection.setAutoCommit(true)");
226:
227: oCon.setAutoCommit(true);
228:
229: // Create Atom queue.
230: if (DebugFile.trace)
231: DebugFile.writeln("new AtomQueue()");
232:
233: oQue = new AtomQueue();
234:
235: // This object feeds the queue with new atoms
236: // extracted from the database.
237: if (DebugFile.trace)
238: DebugFile.writeln("new AtomFeeder()");
239:
240: AtomFeeder oFdr = new AtomFeeder();
241:
242: // This is the queue consumer object
243: // it grants that only one atom is
244: // poped from the queue at a time.
245: if (DebugFile.trace)
246: DebugFile
247: .writeln("new AtomConsumer([JDCconnection], [AtomQueue])");
248:
249: oCsr = new AtomConsumer(oCon, oQue);
250:
251: // Create WorkerThreadPool
252:
253: if (DebugFile.trace)
254: DebugFile
255: .writeln("new WorkerThreadPool([AtomConsumer], [Properties])");
256:
257: oThreadPool = new WorkerThreadPool(oCsr, oEnvProps);
258:
259: // Register callbacks on each worker thread
260:
261: ListIterator oIter = oCallbacks.listIterator();
262: while (oIter.hasNext())
263: oThreadPool
264: .registerCallback((WorkerThreadCallback) oIter
265: .next());
266:
267: dtStartDate = new Date();
268:
269: do {
270: try {
271:
272: while (bContinue) {
273: // Count how many atoms are pending of processing at the database
274: oStmt = oCon.createStatement(
275: ResultSet.TYPE_FORWARD_ONLY,
276: ResultSet.CONCUR_READ_ONLY);
277:
278: try {
279: oStmt.setQueryTimeout(20);
280: } catch (SQLException sqle) {
281: }
282:
283: // ***************************************************
284: // Finish all the jobs that have no more pending atoms
285: sSQL = "SELECT j.gu_job FROM k_jobs j WHERE ("
286: + "j.id_status="
287: + String.valueOf(Job.STATUS_PENDING)
288: + " OR "
289: + "j.id_status="
290: + String.valueOf(Job.STATUS_RUNNING)
291: + ") AND "
292: + "NOT EXISTS (SELECT a.pg_atom FROM k_job_atoms a WHERE "
293: + "j.gu_job=a.gu_job AND a.id_status IN ("
294: + String.valueOf(Atom.STATUS_PENDING)
295: + ","
296: + String.valueOf(Atom.STATUS_RUNNING)
297: + ","
298: + String.valueOf(Atom.STATUS_SUSPENDED)
299: + "))";
300:
301: if (DebugFile.trace)
302: DebugFile.writeln("Statement.executeQuery("
303: + sSQL + ")");
304:
305: oRSet = oStmt.executeQuery(sSQL);
306: LinkedList oFinished = new LinkedList();
307: while (oRSet.next()) {
308: oFinished.add(oRSet.getString(1));
309: } // wend
310: oRSet.close();
311:
312: if (DebugFile.trace)
313: DebugFile.writeln("Already finished jobs "
314: + String.valueOf(oFinished.size()));
315:
316: if (oFinished.size() > 0) {
317: sSQL = "UPDATE k_jobs SET id_status="
318: + String
319: .valueOf(Job.STATUS_FINISHED)
320: + ",dt_finished="
321: + DBBind.Functions.GETDATE
322: + " WHERE gu_job=?";
323: if (DebugFile.trace)
324: DebugFile
325: .writeln("Connection.prepareStatement("
326: + sSQL + ")");
327: PreparedStatement oUpdt = oCon
328: .prepareStatement(sSQL);
329: oIter = oFinished.listIterator();
330: while (oIter.hasNext()) {
331: oUpdt.setObject(1, oIter.next(),
332: java.sql.Types.CHAR);
333: oUpdt.executeUpdate();
334: } // wend
335: oUpdt.close();
336: } // fi
337:
338: // ****************************************
339: // Count jobs pending of begining execution
340:
341: if (DebugFile.trace)
342: DebugFile
343: .writeln("Statement.executeQuery(SELECT COUNT(*) FROM k_jobs WHERE id_status="
344: + String
345: .valueOf(Job.STATUS_PENDING)
346: + ")");
347:
348: oRSet = oStmt
349: .executeQuery("SELECT COUNT(*) FROM k_jobs WHERE id_status="
350: + String
351: .valueOf(Job.STATUS_PENDING));
352: oRSet.next();
353: iJobCount = oRSet.getInt(1);
354: oRSet.close();
355: oStmt.close();
356:
357: if (DebugFile.trace)
358: DebugFile.writeln(String.valueOf(iJobCount)
359: + " pending jobs");
360:
361: if (0 == iJobCount)
362: sleep(10000);
363: else
364: break;
365: } // wend
366:
367: if (bContinue) {
368: oFdr.loadAtoms(oCon, oThreadPool.size());
369:
370: oFdr.feedQueue(oCon, oQue);
371:
372: if (oQue.size() > 0)
373: oThreadPool.launchAll();
374:
375: do {
376:
377: sleep(10000);
378:
379: if (DebugFile.trace)
380: DebugFile.writeln(String
381: .valueOf(oThreadPool
382: .livethreads())
383: + " live threads");
384:
385: } while (oThreadPool.livethreads() == oThreadPool
386: .size());
387: } // fi (bContinue)
388: } catch (InterruptedException e) {
389: if (DebugFile.trace)
390: DebugFile
391: .writeln("SchedulerDaemon InterruptedException "
392: + e.getMessage());
393: }
394: } while (bContinue);
395:
396: if (DebugFile.trace)
397: DebugFile.writeln(" exiting SchedulerDaemon");
398:
399: oThreadPool.haltAll();
400: oThreadPool = null;
401:
402: oCsr.close();
403: oCsr = null;
404:
405: oFdr = null;
406: oQue = null;
407:
408: if (DebugFile.trace)
409: DebugFile.writeln("JDConnection.close()");
410:
411: oCon.close("SchedulerDaemon");
412: oCon = null;
413:
414: oDbb.close();
415: oDbb = null;
416: } catch (SQLException e) {
417: try {
418: oThreadPool.haltAll();
419: oThreadPool = null;
420: } catch (Exception ignore) {
421: }
422: try {
423: oCsr.close();
424: oCsr = null;
425: } catch (Exception ignore) {
426: }
427: try {
428: if (oCon != null)
429: if (!oCon.isClosed())
430: oCon.close("SchedulerDaemon");
431: } catch (SQLException sqle) {
432: if (DebugFile.trace)
433: DebugFile
434: .writeln("SchedulerDaemon SQLException on close() "
435: + sqle.getMessage());
436: }
437: if (null != oDbb) {
438: try {
439: oDbb.close();
440: } catch (Exception ignore) {
441: }
442: }
443: oCon = null;
444:
445: dtStartDate = null;
446: dtStopDate = new Date();
447:
448: if (DebugFile.trace)
449: DebugFile.writeln("SchedulerDaemon SQLException "
450: + e.getMessage());
451: DebugFile
452: .writeln("SchedulerDaemon.run() abnormal termination");
453: }
454: if (DebugFile.trace)
455: DebugFile.writeln("End SchedulerDaemon.run()");
456: } // run
457:
458: // ---------------------------------------------------------------------------
459:
460: public void registerCallback(WorkerThreadCallback oNewCallback)
461: throws IllegalArgumentException {
462:
463: if (oThreadPool == null)
464: oCallbacks.addLast(oNewCallback);
465: else
466: oThreadPool.registerCallback(oNewCallback);
467: }
468:
469: // ---------------------------------------------------------------------------
470:
471: public void unregisterCallback(String sCallbackName) {
472: if (oThreadPool != null)
473: oThreadPool.unregisterCallback(sCallbackName);
474: }
475:
476: // ---------------------------------------------------------------------------
477:
478: private static void interruptJobs(JDCConnection oCon, Object[] aJobs)
479: throws SQLException {
480: int nJobs;
481: if (null == aJobs)
482: nJobs = 0;
483: else
484: nJobs = aJobs.length;
485: if (nJobs > 0) {
486: PreparedStatement oStmt = oCon.prepareStatement("UPDATE "
487: + DB.k_jobs + " SET " + DB.id_status + "="
488: + String.valueOf(Job.STATUS_INTERRUPTED)
489: + " WHERE " + DB.gu_job + "=?");
490: for (int j = 0; j < nJobs; j++) {
491: if (null != aJobs[j]) {
492: oStmt.setObject(1, aJobs[j], Types.CHAR);
493: oStmt.executeUpdate();
494: }
495: }
496: oStmt.close();
497: }
498: }
499:
500: // ---------------------------------------------------------------------------
501:
502: private static void suspendJobs(JDCConnection oCon, Object[] aJobs)
503: throws SQLException {
504: int nJobs;
505: if (null == aJobs)
506: nJobs = 0;
507: else
508: nJobs = aJobs.length;
509: if (nJobs > 0) {
510: PreparedStatement oStmt = oCon.prepareStatement("UPDATE "
511: + DB.k_jobs + " SET " + DB.id_status + "="
512: + String.valueOf(Job.STATUS_SUSPENDED) + " WHERE "
513: + DB.gu_job + "=?");
514: for (int j = 0; j < nJobs; j++) {
515: if (null != aJobs[j]) {
516: oStmt.setObject(1, aJobs[j], Types.CHAR);
517: oStmt.executeUpdate();
518: }
519: }
520: oStmt.close();
521: }
522: }
523:
524: // ---------------------------------------------------------------------------
525:
526: /**
527: * <p>Halt worker threads and set running jobs status to suspended</p>
528: * Wait until all running atoms are finished and then stop all worker threads
529: * @throws IllegalStateException If worker threads are not running
530: */
531: public void haltAll() throws IllegalStateException {
532: if (null == oThreadPool)
533: throw new IllegalStateException(
534: "SchedulerDaemon.haltAll() Thread pool not initialized, call start() method before trying to halt worker threads");
535: String[] aInitRunningJobs = oThreadPool.runningJobs();
536: oThreadPool.haltAll();
537: String[] aStillRunningJobs = oThreadPool.runningJobs();
538: if (null != oDbb) {
539: try {
540: JDCConnection oCon = oDbb
541: .getConnection("SchedulerDaemonHaltAll");
542: if (null != aInitRunningJobs) {
543: if (null != aStillRunningJobs) {
544: int nInitRunningJobs = aInitRunningJobs.length;
545: int nStillRunningJobs = aStillRunningJobs.length;
546: for (int i = 0; i < nInitRunningJobs; i++) {
547: boolean bStillRunning = false;
548: for (int j = 0; j < nStillRunningJobs
549: && !bStillRunning; j++) {
550: bStillRunning = aStillRunningJobs[j]
551: .equals(aInitRunningJobs[i]);
552: } // next
553: if (bStillRunning)
554: aInitRunningJobs[i] = null;
555: } // next
556: } // fi
557: suspendJobs(oCon, aInitRunningJobs);
558: } // fi
559: oCon.close("SchedulerDaemonHaltAll");
560: } catch (SQLException sqle) {
561: throw new IllegalStateException(
562: "SchedulerDaemon.haltAll() SQLException "
563: + sqle.getMessage());
564: }
565: }
566: }
567:
568: // ---------------------------------------------------------------------------
569:
570: /**
571: * <p>Stop worker threads and set running jobs status to interrupted</p>
572: * Call haltAll() Wait until the specified amount of miliseconds
573: * and force all worker threads still alive to stop.
574: * This method must only be used when stalled worker threads cannot be stopped
575: * by calling haltAll().
576: * @param lDelayMilis long Delay (in miliseconds) to wait before executing
577: * threads are forced to stop
578: * @throws IllegalStateException If worker threads are not running
579: */
580: public synchronized void stopAll(long lDelayMilis)
581: throws IllegalStateException, SQLException {
582:
583: if (null == oThreadPool)
584: throw new IllegalStateException(
585: "SchedulerDaemon.stopAll() Thread pool not initialized, call start() method before trying to stop worker threads");
586:
587: oThreadPool.haltAll();
588:
589: try {
590: sleep(lDelayMilis);
591: } catch (InterruptedException ignore) {
592: }
593:
594: bContinue = false;
595:
596: if (null != oDbb) {
597: JDCConnection oCon = oDbb
598: .getConnection("SchedulerDaemonStopAll");
599: oThreadPool.stopAll(oCon);
600: interruptJobs(oCon, oThreadPool.runningJobs());
601: oCon.close("SchedulerDaemonStopAll");
602: } else {
603: oThreadPool.stopAll();
604: }
605:
606: } // stopAll
607:
608: // ---------------------------------------------------------------------------
609:
610: /**
611: * <p>Stop worker threads and set running jobs status to interrupted</p>
612: * Default delay for forcing threads to stop is 10 seconds
613: * @throws IllegalStateException If worker threads are not running
614: */
615: public void stopAll() throws IllegalStateException, SQLException {
616: stopAll(10000l);
617: }
618:
619: // ---------------------------------------------------------------------------
620:
621: private static void printUsage() {
622: System.out.println("");
623: System.out.println("Usage:");
624: System.out.println("SchedulerDaemon cnf_file_path [verbose]");
625: }
626:
627: public static void main(String[] argv)
628: throws ClassNotFoundException, SQLException, IOException {
629:
630: DBBind oGlobalDBBind = new DBBind();
631: SchedulerDaemon TheDaemon;
632:
633: if (argv.length < 1 || argv.length > 2)
634: printUsage();
635:
636: else if (argv.length == 2 && !argv[1].equals("verbose"))
637: printUsage();
638:
639: else {
640:
641: TheDaemon = new SchedulerDaemon(argv[0]);
642:
643: if (argv.length == 2)
644: TheDaemon.registerCallback(new SystemOutNotify());
645:
646: TheDaemon.start();
647: }
648: }
649: } // SchedulerDaemon
|