001: package com.knowgate.scheduler;
002:
003: import java.sql.DriverManager;
004: import java.sql.SQLException;
005: import java.sql.Statement;
006: import java.sql.ResultSet;
007: import java.sql.ResultSetMetaData;
008:
009: import java.io.FileInputStream;
010: import java.io.IOException;
011: import java.io.FileNotFoundException;
012:
013: import java.util.Properties;
014: import java.util.LinkedList;
015: import java.util.ListIterator;
016:
017: import javax.mail.MessagingException;
018:
019: import com.knowgate.jdc.JDCConnection;
020:
021: import com.knowgate.dataobjs.DB;
022: import com.knowgate.dataobjs.DBBind;
023: import com.knowgate.dataobjs.DBSubset;
024:
025: import com.knowgate.misc.Gadgets;
026:
027: /**
028: * <p>Single Thread Scheduler Executor</p>
029: * <p>SingleThreadExecutor is a class that processes jobs and atoms in a simple way,
030: * unlike SchedulerDaemon witch is based on an AtomQueue and a WorkerThreadPool,
031: * SingleThreadExecutor uses directly the database for tracking execution progress
032: * for a single thread.</p>
033: * @author Sergio Montoro Ten
034: * @version 1.0
035: */
036:
037: public class SingleThreadExecutor extends Thread {
038:
039: private String sEnvProps;
040:
041: private Properties oEnvProps;
042:
043: private boolean bContinue;
044:
045: private String sLastError;
046:
047: private String sJob;
048:
049: private Job oJob;
050:
051: private Atom oAtm;
052:
053: private LinkedList oCallbacks;
054:
055: private int iCallbacks;
056:
057: // ---------------------------------------------------------------------------
058:
059: private static class SystemOutNotify extends WorkerThreadCallback {
060:
061: public SystemOutNotify() {
062: super ("SystemOutNotify");
063: }
064:
065: public void call(String sThreadId, int iOpCode,
066: String sMessage, Exception oXcpt, Object oParam) {
067:
068: if (WorkerThreadCallback.WT_EXCEPTION == iOpCode)
069: System.out.println("Thread " + sThreadId + ": ERROR "
070: + sMessage);
071: else
072: System.out.println("Thread " + sThreadId + ": "
073: + sMessage);
074: }
075: }
076:
077: // ---------------------------------------------------------------------------
078:
079: /**
080: * <p>Create new SingleThreadExecutor</p>
081: * @param sPropertiesFilePath Absolute path to hipergate.cnf properties file
082: * @throws FileNotFoundException
083: * @throws IOException
084: */
085: public SingleThreadExecutor(String sPropertiesFilePath)
086: throws FileNotFoundException, IOException {
087:
088: sJob = null;
089:
090: bContinue = true;
091:
092: if (sPropertiesFilePath.lastIndexOf(System
093: .getProperty("file.separator")) == -1)
094: sEnvProps = sPropertiesFilePath;
095: else
096: sEnvProps = sPropertiesFilePath
097: .substring(sPropertiesFilePath.lastIndexOf(System
098: .getProperty("file.separator")) + 1);
099:
100: FileInputStream oInProps = new FileInputStream(
101: sPropertiesFilePath);
102: oEnvProps = new Properties();
103: oEnvProps.load(oInProps);
104: oInProps.close();
105:
106: oCallbacks = new LinkedList();
107: }
108:
109: /**
110: * <p>Create new SingleThreadExecutor for a single Job</p>
111: * @param sPropertiesFilePath Absolute path to hipergate.cnf properties file
112: * @throws FileNotFoundException
113: * @throws IOException
114: */
115: public SingleThreadExecutor(String sPropertiesFilePath,
116: String sJobId) throws FileNotFoundException, IOException {
117:
118: sJob = sJobId;
119:
120: bContinue = true;
121:
122: if (sPropertiesFilePath.lastIndexOf(System
123: .getProperty("file.separator")) == -1)
124: sEnvProps = sPropertiesFilePath;
125: else
126: sEnvProps = sPropertiesFilePath
127: .substring(sPropertiesFilePath.lastIndexOf(System
128: .getProperty("file.separator")) + 1);
129:
130: FileInputStream oInProps = new FileInputStream(
131: sPropertiesFilePath);
132: oEnvProps = new Properties();
133: oEnvProps.load(oInProps);
134: oInProps.close();
135:
136: oCallbacks = new LinkedList();
137: }
138:
139: // ---------------------------------------------------------------------------
140:
141: public Atom activeAtom() {
142: return oAtm;
143: }
144:
145: // ---------------------------------------------------------------------------
146:
147: public Job activeJob() {
148: return oJob;
149: }
150:
151: // ---------------------------------------------------------------------------
152:
153: public String lastError() {
154: return sLastError;
155: }
156:
157: // ---------------------------------------------------------------------------
158:
159: /**
160: * Register a thread callback object
161: * @param oNewCallback WorkerThreadCallback subclass instance
162: * @throws IllegalArgumentException If a callback with same name has oNewCallback was already registered
163: */
164: public void registerCallback(WorkerThreadCallback oNewCallback)
165: throws IllegalArgumentException {
166:
167: WorkerThreadCallback oCallback;
168: ListIterator oIter = oCallbacks.listIterator();
169:
170: while (oIter.hasNext()) {
171: oCallback = (WorkerThreadCallback) oIter.next();
172:
173: if (oCallback.name().equals(oNewCallback.name())) {
174: throw new IllegalArgumentException("Callback "
175: + oNewCallback.name()
176: + " is already registered");
177: } // fi
178: } // wend
179:
180: oCallbacks.addLast(oNewCallback);
181: iCallbacks++;
182: } // registerCallback
183:
184: // ---------------------------------------------------------------------------
185:
186: /**
187: * Unregister a thread callback object
188: * @param sCallbackName Name of callback to be unregistered
189: * @return <b>true</b> if a callback with such name was found and unregistered,
190: * <b>false</b> otherwise
191: */
192: public boolean unregisterCallback(String sCallbackName) {
193: WorkerThreadCallback oCallback;
194: ListIterator oIter = oCallbacks.listIterator();
195:
196: while (oIter.hasNext()) {
197: oCallback = (WorkerThreadCallback) oIter.next();
198:
199: if (oCallback.name().equals(sCallbackName)) {
200: oIter.remove();
201: iCallbacks--;
202: return true;
203: } // fi
204: } // wend
205:
206: return false;
207: } // unregisterCallback
208:
209: // ---------------------------------------------------------------------------
210:
211: private void callBack(int iOpCode, String sMessage,
212: Exception oXcpt, Object oParam) {
213:
214: WorkerThreadCallback oCallback;
215: ListIterator oIter = oCallbacks.listIterator();
216:
217: while (oIter.hasNext()) {
218: oCallback = (WorkerThreadCallback) oIter.next();
219: oCallback.call(getName(), iOpCode, sMessage, oXcpt, oParam);
220: } // wend
221:
222: } // callBack
223:
224: // ---------------------------------------------------------------------------
225:
226: public void run() {
227: Statement oStm;
228: JDCConnection oCon;
229: AtomFeeder oFdr;
230: DBSubset oDBS;
231: String sSQL;
232: String sJId;
233: ResultSet oRst;
234: ResultSetMetaData oMDt;
235:
236: DBBind oDBB = null;
237:
238: try {
239: oDBB = new DBBind(sEnvProps);
240:
241: oCon = new JDCConnection(DriverManager.getConnection(
242: oEnvProps.getProperty("dburl"), oEnvProps
243: .getProperty("dbuser"), oEnvProps
244: .getProperty("dbpassword")), null);
245: bContinue = true;
246:
247: sLastError = "";
248:
249: while (bContinue) {
250:
251: oFdr = new AtomFeeder();
252:
253: if (sJob == null)
254: oDBS = oFdr.loadAtoms(oCon, 1);
255: else
256: oDBS = oFdr.loadAtoms(oCon, sJob);
257:
258: if (oDBS.getRowCount() > 0) {
259:
260: sJId = oDBS.getString(0, 0);
261:
262: oJob = Job.instantiate(oCon, sJId, oEnvProps);
263:
264: oStm = oCon.createStatement(
265: ResultSet.TYPE_FORWARD_ONLY,
266: ResultSet.CONCUR_READ_ONLY);
267:
268: sSQL = "SELECT a.*, j." + DB.tx_parameters
269: + " FROM " + DB.k_job_atoms + " a, "
270: + DB.k_jobs + " j WHERE a." + DB.id_status
271: + "=" + String.valueOf(Atom.STATUS_PENDING)
272: + " AND j." + DB.gu_job + "=a." + DB.gu_job
273: + " AND j." + DB.gu_job + "='" + sJId + "'";
274:
275: oRst = oStm.executeQuery(sSQL);
276: oMDt = oRst.getMetaData();
277:
278: while (oRst.next()) {
279:
280: oAtm = new Atom(oRst, oMDt);
281:
282: oJob.process(oAtm);
283:
284: } // wend
285: oRst.close();
286: oStm.close();
287: } else
288: bContinue = false;
289: } // wend
290:
291: oCon.close();
292:
293: oDBB.close();
294: } catch (MessagingException e) {
295:
296: oDBB.close();
297:
298: sLastError = "MessagingException " + e.getMessage();
299:
300: if (iCallbacks > 0)
301: callBack(-1, sLastError, new MessagingException(e
302: .getMessage(), e.getNextException()), null);
303:
304: if (oJob != null)
305: oJob.log(sLastError + "\n");
306:
307: } catch (SQLException e) {
308:
309: if (null != oDBB)
310: oDBB.close();
311:
312: sLastError = "SQLException " + e.getMessage();
313:
314: if (iCallbacks > 0)
315: callBack(-1, sLastError, new SQLException(e
316: .getMessage(), e.getSQLState(), e
317: .getErrorCode()), null);
318:
319: if (oJob != null)
320: oJob.log(sLastError + "\n");
321:
322: } catch (FileNotFoundException e) {
323: if (null != oDBB)
324: oDBB.close();
325:
326: sLastError = "FileNotFoundException " + e.getMessage();
327:
328: if (iCallbacks > 0)
329: callBack(-1, sLastError, new FileNotFoundException(e
330: .getMessage()), null);
331:
332: if (oJob != null)
333: oJob.log(sLastError + "\n");
334: } catch (IOException e) {
335: if (null != oDBB)
336: oDBB.close();
337:
338: sLastError = "IOException " + e.getMessage();
339:
340: if (iCallbacks > 0)
341: callBack(-1, sLastError,
342: new IOException(e.getMessage()), null);
343:
344: if (oJob != null)
345: oJob.log(sLastError + "\n");
346: } catch (ClassNotFoundException e) {
347:
348: if (null != oDBB)
349: oDBB.close();
350:
351: sLastError = "ClassNotFoundException " + e.getMessage();
352:
353: if (iCallbacks > 0)
354: callBack(-1, sLastError, new ClassNotFoundException(e
355: .getMessage()), null);
356:
357: if (oJob != null)
358: oJob.log(sLastError + "\n");
359: } catch (InstantiationException e) {
360:
361: if (null != oDBB)
362: oDBB.close();
363:
364: sLastError = "InstantiationException " + e.getMessage();
365:
366: if (iCallbacks > 0)
367: callBack(-1, sLastError, new InstantiationException(e
368: .getMessage()), null);
369:
370: if (oJob != null)
371: oJob.log(sLastError + "\n");
372: } catch (IllegalAccessException e) {
373:
374: if (null != oDBB)
375: oDBB.close();
376:
377: sLastError = "IllegalAccessException " + e.getMessage();
378:
379: if (iCallbacks > 0)
380: callBack(-1, sLastError, new IllegalAccessException(e
381: .getMessage()), null);
382:
383: if (oJob != null)
384: oJob.log(sLastError + "\n");
385: } catch (NullPointerException e) {
386:
387: if (null != oDBB)
388: oDBB.close();
389:
390: sLastError = "NullPointerException " + e.getMessage();
391:
392: if (iCallbacks > 0)
393: callBack(-1, sLastError, new NullPointerException(e
394: .getMessage()), null);
395:
396: if (oJob != null)
397: oJob.log(sLastError + "\n");
398: }
399: } // run
400:
401: // ---------------------------------------------------------------------------
402:
403: /**
404: * <p>Halt thread execution commiting all operations in course before stopping</p>
405: * If a thread is dead-locked by any reason halting it will not cause any effect.<br>
406: * halt() method only sends a signals to the each WokerThread telling it that must
407: * finish pending operations and stop.
408: */
409: public void halt() {
410: bContinue = false;
411: }
412:
413: // ***************************************************************************
414: // Static Methods
415:
416: private static void printUsage() {
417: System.out.println("");
418: System.out.println("Usage:");
419: System.out
420: .println("SingleThreadExecutor {run | lrun} job_type cnf_file_path {gu_job | xml_file_path} [verbose]");
421: System.out
422: .println("job_type is one of {MAIL | FAX | SAVE | FTP}");
423: }
424:
425: public static void main(String[] argv)
426: throws java.io.FileNotFoundException, java.io.IOException,
427: SQLException, ClassNotFoundException,
428: IllegalAccessException, InstantiationException,
429: org.xml.sax.SAXException {
430:
431: SingleThreadExecutor oExec;
432:
433: if (argv.length != 4 && argv.length != 5)
434: printUsage();
435:
436: else if (argv.length == 5 && !argv[4].equals("verbose"))
437: printUsage();
438:
439: else if (!argv[0].equals("run") && !argv[0].equals("lrun"))
440: printUsage();
441:
442: else if (!argv[1].equalsIgnoreCase("MAIL")
443: && !argv[1].equalsIgnoreCase("FAX")
444: && !argv[1].equalsIgnoreCase("SAVE")
445: && !argv[1].equalsIgnoreCase("FTP"))
446: printUsage();
447:
448: else {
449:
450: if (argv[0].equals("run"))
451: oExec = new SingleThreadExecutor(argv[2], argv[3]);
452:
453: else {
454: String sJobGUID = Gadgets.generateUUID();
455:
456: Job.main(new String[] { "create", argv[1], argv[2],
457: argv[3], sJobGUID });
458:
459: oExec = new SingleThreadExecutor(argv[2], sJobGUID);
460: }
461:
462: if (argv.length == 5)
463: oExec.registerCallback(new SystemOutNotify());
464:
465: oExec.start();
466: } // fi
467: } // main
468:
469: } // SingleThreadExecutor
|