001: /*
002: Copyright (C) 2003 Know Gate S.L. All rights reserved.
003: C/Oņa, 107 1š2 28050 Madrid (Spain)
004:
005: Redistribution and use in source and binary forms, with or without
006: modification, are permitted provided that the following conditions
007: are met:
008:
009: 1. Redistributions of source code must retain the above copyright
010: notice, this list of conditions and the following disclaimer.
011:
012: 2. The end-user documentation included with the redistribution,
013: if any, must include the following acknowledgment:
014: "This product includes software parts from hipergate
015: (http://www.hipergate.org/)."
016: Alternately, this acknowledgment may appear in the software itself,
017: if and wherever such third-party acknowledgments normally appear.
018:
019: 3. The name hipergate must not be used to endorse or promote products
020: derived from this software without prior written permission.
021: Products derived from this software may not be called hipergate,
022: nor may hipergate appear in their name, without prior written
023: permission.
024:
025: This library is distributed in the hope that it will be useful,
026: but WITHOUT ANY WARRANTY; without even the implied warranty of
027: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
028:
029: You should have received a copy of hipergate License with this code;
030: if not, visit http://www.hipergate.org or mail to info@hipergate.org
031: */
032:
033: package com.knowgate.scheduler;
034:
035: import java.lang.Thread;
036: import java.util.Date;
037: import java.util.Properties;
038: import java.util.LinkedList;
039: import java.util.ListIterator;
040:
041: import java.sql.SQLException;
042: import java.sql.Connection;
043: import java.io.IOException;
044: import java.io.FileNotFoundException;
045: import java.io.File;
046: import java.io.FileInputStream;
047: import java.io.FileOutputStream;
048:
049: import javax.mail.MessagingException;
050:
051: import com.knowgate.jdc.JDCConnection;
052: import com.knowgate.dataobjs.DB;
053: import com.knowgate.dataxslt.*;
054: import com.knowgate.dataxslt.db.PageSetDB;
055: import com.knowgate.debug.DebugFile;
056: import com.knowgate.scheduler.*;
057: import com.knowgate.crm.DistributionList;
058:
059: /**
060: * <p>Scheduled Job Worker Thread</p>
061: * @author Sergio Montoro Ten
062: * @version 1.0
063: */
064:
065: public class WorkerThread extends Thread {
066:
067: private String sLastError;
068: private Job oJob; // Current Job
069: private Atom oAtm; // Atom being processed
070: private long lRunningTime;
071: private int delay = 1; // Thread sleeps n miliseconds on each loop
072: private AtomConsumer oConsumer;
073: private WorkerThreadPool oPool;
074: private LinkedList oCallbacks;
075: private int iCallbacks;
076: private boolean bContinue;
077:
078: // ----------------------------------------------------------
079:
080: /**
081: * Create WorkerThread
082: * @param oThreadPool
083: * @param oAtomConsumer
084: */
085:
086: public WorkerThread(WorkerThreadPool oThreadPool,
087: AtomConsumer oAtomConsumer) {
088: oConsumer = oAtomConsumer;
089: oPool = oThreadPool;
090: oCallbacks = new LinkedList();
091: iCallbacks = 0;
092: oJob = null;
093: sLastError = "";
094: lRunningTime = 0;
095: }
096:
097: // ----------------------------------------------------------
098:
099: public int getDelayMS() {
100: return delay;
101: }
102:
103: // ----------------------------------------------------------
104:
105: public void getDelayMS(int iMiliseconds) {
106: delay = iMiliseconds;
107: }
108:
109: // ----------------------------------------------------------
110:
111: public long getRunningTimeMS() {
112: return lRunningTime;
113: }
114:
115: // ----------------------------------------------------------
116:
117: public void setConsumer(AtomConsumer oAtomConsumer) {
118: oConsumer = oAtomConsumer;
119: }
120:
121: // ----------------------------------------------------------
122:
123: /**
124: * Get Environment property from hipergate.cnf
125: * @param sKey Property Name
126: * @return Property Value or <b>null</b> if not found
127: */
128: public String getProperty(String sKey) {
129: return oPool.getProperty(sKey);
130: }
131:
132: // ---------------------------------------------------------------------------
133:
134: public Atom activeAtom() {
135: return oAtm;
136: }
137:
138: // ---------------------------------------------------------------------------
139:
140: public Job activeJob() {
141: return oJob;
142: }
143:
144: // ---------------------------------------------------------------------------
145:
146: public String lastError() {
147: return sLastError;
148: }
149:
150: // ---------------------------------------------------------------------------
151:
152: /**
153: * Register a thread callback object
154: * @param oNewCallback WorkerThreadCallback subclass instance
155: * @throws IllegalArgumentException If a callback with same name has oNewCallback was already registered
156: */
157: public void registerCallback(WorkerThreadCallback oNewCallback)
158: throws IllegalArgumentException {
159:
160: WorkerThreadCallback oCallback;
161: ListIterator oIter = oCallbacks.listIterator();
162:
163: while (oIter.hasNext()) {
164: oCallback = (WorkerThreadCallback) oIter.next();
165:
166: if (oCallback.name().equals(oNewCallback.name())) {
167: throw new IllegalArgumentException("Callback "
168: + oNewCallback.name()
169: + " is already registered");
170: } // fi
171: } // wend
172:
173: oCallbacks.addLast(oNewCallback);
174: iCallbacks++;
175: } // registerCallback
176:
177: // ---------------------------------------------------------------------------
178:
179: /**
180: * Unregister a thread callback object
181: * @param sCallbackName Name of callback to be unregistered
182: * @return <b>true</b> if a callback with such name was found and unregistered,
183: * <b>false</b> otherwise
184: */
185: public boolean unregisterCallback(String sCallbackName) {
186: WorkerThreadCallback oCallback;
187: ListIterator oIter = oCallbacks.listIterator();
188:
189: while (oIter.hasNext()) {
190: oCallback = (WorkerThreadCallback) oIter.next();
191:
192: if (oCallback.name().equals(sCallbackName)) {
193: oIter.remove();
194: iCallbacks--;
195: return true;
196: } // fi
197: } // wend
198:
199: return false;
200: } // unregisterCallback
201:
202: // ---------------------------------------------------------------------------
203:
204: private void callBack(int iOpCode, String sMessage,
205: Exception oXcpt, Object oParam) {
206: WorkerThreadCallback oCallback;
207: ListIterator oIter = oCallbacks.listIterator();
208:
209: while (oIter.hasNext()) {
210: oCallback = (WorkerThreadCallback) oIter.next();
211: oCallback.call(getName(), iOpCode, sMessage, oXcpt, oParam);
212: } // wend
213:
214: }
215:
216: // ---------------------------------------------------------------------------
217:
218: /**
219: * <p>Process atoms obtained throught AtomConsumer</p>
220: * Each worker WorkerThread will enter an endless loop until the queue is empty
221: * or an interrupt signal is received.<br>
222: * If an exception is thrown while creating of processing atoms the workerthread
223: * will be aborted.
224: */
225: public void run() {
226: String sJob = ""; // Current Job Unique Id.
227: JDCConnection oConsumerConnection = null;
228:
229: if (DebugFile.trace) {
230: DebugFile.writeln("Begin WorkerThread.run()");
231: DebugFile.incIdent();
232: DebugFile.writeln("thread=" + getName());
233: }
234:
235: bContinue = true;
236:
237: sLastError = "";
238:
239: while (bContinue) {
240:
241: try {
242: if (delay > 0)
243: sleep(delay);
244:
245: long lStartRun = new Date().getTime();
246:
247: if (DebugFile.trace)
248: DebugFile.writeln(getName()
249: + " getting next atom...");
250:
251: oAtm = oConsumer.next();
252:
253: if (oAtm == null) {
254: // No more atoms to consume
255: if (DebugFile.trace)
256: DebugFile
257: .writeln(getName() + " no more atoms.");
258:
259: if (iCallbacks > 0)
260: callBack(
261: WorkerThreadCallback.WT_ATOMCONSUMER_NOMORE,
262: "Thread " + getName()
263: + " no more Atoms", null,
264: oConsumer);
265:
266: break;
267: }
268:
269: if (iCallbacks > 0)
270: callBack(WorkerThreadCallback.WT_ATOM_GET,
271: "Thread "
272: + getName()
273: + " got Atom "
274: + String.valueOf(oAtm
275: .getInt(DB.pg_atom)), null,
276: oAtm);
277:
278: oConsumerConnection = oConsumer.getConnection();
279:
280: if (DebugFile.trace)
281: DebugFile
282: .writeln(getName()
283: + " AtomConsumer.getConnection() : "
284: + (oConsumerConnection != null ? "[Conenction]"
285: : "null"));
286:
287: // ***********************************
288: // Instantiate the proper Job subclass
289:
290: if (!sJob.equals(oAtm.getString(DB.gu_job))) {
291:
292: // The Job is only re-loaded if it is different from the previous job at this thread
293: // this is a Job instance reuse policy for better performance.
294:
295: sJob = oAtm.getString(DB.gu_job);
296:
297: try {
298: // Dynamically instantiate the job subclass specified at k_lu_job_commands table
299: oJob = Job.instantiate(oConsumerConnection,
300: sJob, oPool.getProperties());
301:
302: if (iCallbacks > 0)
303: callBack(
304: WorkerThreadCallback.WT_JOB_INSTANTIATE,
305: "instantiate job "
306: + sJob
307: + " command "
308: + oJob
309: .getString(DB.id_command),
310: null, oJob);
311: } catch (ClassNotFoundException e) {
312: sJob = "";
313: oJob = null;
314: sLastError = "Job.instantiate(" + sJob
315: + ") ClassNotFoundException "
316: + e.getMessage();
317:
318: if (DebugFile.trace)
319: DebugFile.writeln(getName() + " "
320: + sLastError);
321:
322: if (iCallbacks > 0)
323: callBack(-1, sLastError, e, null);
324:
325: bContinue = false;
326: } catch (IllegalAccessException e) {
327: sJob = "";
328: oJob = null;
329: sLastError = "Job.instantiate(" + sJob
330: + ") IllegalAccessException "
331: + e.getMessage();
332:
333: if (DebugFile.trace)
334: DebugFile.writeln(getName() + " "
335: + sLastError);
336:
337: if (iCallbacks > 0)
338: callBack(-1, sLastError, e, null);
339:
340: bContinue = false;
341: } catch (InstantiationException e) {
342: sJob = "";
343: oJob = null;
344: sLastError = "Job.instantiate(" + sJob
345: + ") InstantiationException "
346: + e.getMessage();
347:
348: if (DebugFile.trace)
349: DebugFile.writeln(getName() + " "
350: + sLastError);
351:
352: if (iCallbacks > 0)
353: callBack(-1, sLastError, e, null);
354:
355: bContinue = false;
356: } catch (SQLException e) {
357: sJob = "";
358: oJob = null;
359: sLastError = " Job.instantiate(" + sJob
360: + ") SQLException " + e.getMessage();
361:
362: if (DebugFile.trace)
363: DebugFile.writeln(getName() + " "
364: + sLastError);
365:
366: if (iCallbacks > 0)
367: callBack(-1, sLastError, e, null);
368:
369: bContinue = false;
370: }
371: } // fi(Previous_Job == CurrentAtom->Job)
372:
373: // ---------------------------------------------------------------------
374:
375: if (null != oJob) {
376:
377: // -------------------------------------------------------------------
378: // Actual Atom processing call here!
379:
380: oJob.process(oAtm);
381:
382: if (DebugFile.trace)
383: DebugFile.writeln("Thread "
384: + getName()
385: + " consumed Atom "
386: + String.valueOf(oAtm
387: .getInt(DB.pg_atom)));
388:
389: // Move Atom register from k_job_atoms to k_job_atoms_archived
390: oAtm.archive(oConsumerConnection);
391:
392: if (iCallbacks > 0)
393: callBack(WorkerThreadCallback.WT_ATOM_CONSUME,
394: "Thread "
395: + getName()
396: + " consumed Atom "
397: + String.valueOf(oAtm
398: .getInt(DB.pg_atom)),
399: null, oAtm);
400:
401: oAtm = null;
402:
403: if (DebugFile.trace)
404: DebugFile.writeln("job "
405: + oJob.getString(DB.gu_job)
406: + " pending "
407: + String.valueOf(oJob.pending()));
408:
409: if (oJob.pending() == 0) {
410: oJob.setStatus(oConsumerConnection,
411: Job.STATUS_FINISHED);
412:
413: if (iCallbacks > 0)
414: callBack(
415: WorkerThreadCallback.WT_JOB_FINISH,
416: "finish", null, oJob);
417: }
418:
419: // -------------------------------------------------------------------
420:
421: } // fi (oJob)
422: else {
423: oAtm = null;
424: sLastError = "Job.instantiate(" + sJob
425: + ") returned null";
426: if (DebugFile.trace)
427: DebugFile.writeln("ERROR: " + sLastError);
428:
429: if (iCallbacks > 0)
430: callBack(
431: -1,
432: sLastError,
433: new NullPointerException(
434: "Job.instantiate(" + sJob + ")"),
435: null);
436:
437: bContinue = false;
438: }
439: oConsumerConnection = null;
440: lRunningTime += new Date().getTime() - lStartRun;
441: } catch (Exception e) {
442:
443: if (DebugFile.trace)
444: DebugFile.writeln(getName() + " "
445: + e.getClass().getName() + " "
446: + e.getMessage());
447:
448: if (null != oJob) {
449: sLastError = e.getClass().getName() + ", job "
450: + oJob.getString(DB.gu_job) + " ";
451: if (null != oAtm) {
452: sLastError = "atom "
453: + String.valueOf(oAtm
454: .getInt(DB.pg_atom)) + " ";
455: if (null != oConsumerConnection) {
456: try {
457: oAtm.setStatus(oConsumerConnection,
458: Atom.STATUS_INTERRUPTED, e
459: .getClass().getName()
460: + " " + e.getMessage());
461: } catch (SQLException sqle) {
462: if (DebugFile.trace)
463: DebugFile
464: .writeln("Atom.setStatus() SQLException "
465: + sqle.getMessage());
466: }
467: }
468: }
469: sLastError += e.getMessage();
470:
471: oJob.log(getName() + " " + e.getClass().getName()
472: + ", job " + oJob.getString(DB.gu_job)
473: + " ");
474: if (null != oAtm)
475: oJob.log("atom "
476: + String.valueOf(oAtm
477: .getInt(DB.pg_atom)) + " ");
478: oJob.log(e.getMessage() + "\n");
479: } // fi (oJob)
480: else
481: sLastError = e.getClass().getName() + " "
482: + e.getMessage();
483:
484: if (iCallbacks > 0)
485: callBack(-1, sLastError, e, oJob);
486:
487: bContinue = false;
488: } finally {
489: sJob = "";
490: oAtm = null;
491: }
492: } // wend
493:
494: if (oJob != null) {
495: oJob.free();
496: oJob = null;
497: }
498:
499: if (DebugFile.trace) {
500: DebugFile.decIdent();
501: DebugFile.writeln("End WorkerThread.run()");
502: }
503: } // run
504:
505: // ---------------------------------------------------------------------------
506:
507: /**
508: * <p>Halt thread execution commiting all operations in course before stopping</p>
509: * If a thread is dead-locked by any reason halting it will not cause any effect.<br>
510: * halt() method only sends a signals to the each WokerThread telling it that must
511: * finish pending operations and stop.
512: */
513: public void halt() {
514: bContinue = false;
515: }
516:
517: // ---------------------------------------------------------------------------
518:
519: } // WorkerThread
|