001: package de.anomic.plasma.dbImport;
002:
003: import java.util.Vector;
004:
005: import de.anomic.plasma.plasmaSwitchboard;
006: import de.anomic.server.logging.serverLog;
007:
008: public class dbImportManager {
009:
010: public final Vector<dbImporter> finishedJobs = new Vector<dbImporter>();
011: public final ThreadGroup runningJobs = new ThreadGroup(
012: "ImporterThreads");
013: public int currMaxJobNr = 0;
014: private plasmaSwitchboard sb;
015:
016: public dbImportManager(plasmaSwitchboard theSb) {
017: this .sb = theSb;
018: }
019:
020: private int generateUniqueJobID() {
021: int jobID;
022: synchronized (this .runningJobs) {
023: jobID = this .currMaxJobNr;
024: this .currMaxJobNr++;
025: }
026: return jobID;
027: }
028:
029: public dbImporter[] getRunningImporter() {
030: Thread[] importThreads = new Thread[this .runningJobs
031: .activeCount() * 2];
032: int activeCount = this .runningJobs.enumerate(importThreads);
033: dbImporter[] importers = new dbImporter[activeCount];
034: for (int i = 0; i < activeCount; i++) {
035: importers[i] = (dbImporter) importThreads[i];
036: }
037: return importers;
038: }
039:
040: public dbImporter[] getFinishedImporter() {
041: return (dbImporter[]) this .finishedJobs
042: .toArray(new dbImporter[this .finishedJobs.size()]);
043: }
044:
045: public dbImporter getImporterByID(int jobID) {
046:
047: Thread[] importThreads = new Thread[this .runningJobs
048: .activeCount() * 2];
049: int activeCount = this .runningJobs.enumerate(importThreads);
050:
051: for (int i = 0; i < activeCount; i++) {
052: dbImporter currThread = (dbImporter) importThreads[i];
053: if (currThread.getJobID() == jobID) {
054: return currThread;
055: }
056: }
057: return null;
058: }
059:
060: public dbImporter getNewImporter(String type) {
061: if (type == null)
062: return null;
063: if (type.length() == 0)
064: return null;
065:
066: // create a new importer thread
067: dbImporter newImporter = null;
068: if (type.equalsIgnoreCase("NURL")) {
069: newImporter = new plasmaCrawlNURLImporter(this .sb);
070: } else if (type.equalsIgnoreCase("sitemap")) {
071: newImporter = new SitemapImporter(this .sb);
072: }
073:
074: // assign a job ID to it
075: newImporter.setJobID(this .generateUniqueJobID());
076:
077: // return the newly created importer
078: return newImporter;
079: }
080:
081: /**
082: * Can be used to close all still running importer threads
083: * e.g. on server shutdown
084: */
085: public void close() {
086: /* clear the finished thread list */
087: this .finishedJobs.clear();
088:
089: /* waiting for all threads to finish */
090: int threadCount = this .runningJobs.activeCount();
091: Thread[] threadList = new Thread[threadCount];
092: threadCount = this .runningJobs.enumerate(threadList);
093:
094: if (threadCount == 0)
095: return;
096:
097: serverLog log = new serverLog("DB-IMPORT");
098: try {
099: // trying to gracefull stop all still running sessions ...
100: log.logInfo("Signaling shutdown to " + threadCount
101: + " remaining dbImporter threads ...");
102: for (int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++) {
103: Thread currentThread = threadList[currentThreadIdx];
104: if (currentThread.isAlive()) {
105: ((dbImporter) currentThread).stopIt();
106: }
107: }
108:
109: // waiting a few ms for the session objects to continue processing
110: try {
111: Thread.sleep(500);
112: } catch (InterruptedException ex) {
113: }
114:
115: // interrupting all still running or pooled threads ...
116: log.logInfo("Sending interruption signal to "
117: + runningJobs.activeCount()
118: + " remaining dbImporter threads ...");
119: runningJobs.interrupt();
120:
121: // we need to use a timeout here because of missing interruptable session threads ...
122: log
123: .logFine("Waiting for "
124: + runningJobs.activeCount()
125: + " remaining dbImporter threads to finish shutdown ...");
126: for (int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++) {
127: Thread currentThread = threadList[currentThreadIdx];
128: if (currentThread.isAlive()) {
129: log.logFine("Waiting for dbImporter thread '"
130: + currentThread.getName() + "' ["
131: + currentThreadIdx
132: + "] to finish shutdown.");
133: try {
134: currentThread.join(500);
135: } catch (InterruptedException ex) {
136: }
137: }
138: }
139:
140: log
141: .logInfo("Shutdown of remaining dbImporter threads finished.");
142: } catch (Exception e) {
143: log
144: .logSevere(
145: "Unexpected error while trying to shutdown all remaining dbImporter threads.",
146: e);
147: }
148: }
149:
150: }
|