001: package de.anomic.plasma.dbImport;
002:
003: import java.io.File;
004: import java.util.HashMap;
005: import java.util.HashSet;
006: import java.util.Iterator;
007: import java.util.TreeSet;
008:
009: import de.anomic.index.indexContainer;
010: import de.anomic.index.indexRWIEntry;
011: import de.anomic.index.indexRWIRowEntry;
012: import de.anomic.index.indexURLEntry;
013: import de.anomic.plasma.plasmaSwitchboard;
014: import de.anomic.plasma.plasmaWordIndex;
015: import de.anomic.server.serverDate;
016:
017: public class plasmaDbImporter extends AbstractImporter implements
018: dbImporter {
019:
020: private File importPrimaryPath, importSecondaryPath;
021:
022: /**
023: * the source word index (the DB to import)
024: */
025: private plasmaWordIndex importWordIndex;
026:
027: /**
028: * the destination word index (the home DB)
029: */
030: protected plasmaWordIndex homeWordIndex;
031: private int importStartSize;
032:
033: private String wordHash = "------------";
034:
035: long wordChunkStart = System.currentTimeMillis(),
036: wordChunkEnd = this .wordChunkStart;
037: String wordChunkStartHash = "------------", wordChunkEndHash;
038: private long urlCounter = 0, wordCounter = 0, entryCounter = 0,
039: notBoundEntryCounter = 0;
040:
041: public plasmaDbImporter(plasmaSwitchboard sb,
042: plasmaWordIndex homeWI, plasmaWordIndex importWI) {
043: super ("PLASMADB", sb);
044: this .homeWordIndex = homeWI;
045: this .importWordIndex = importWI;
046: }
047:
048: /**
049: * @see dbImporter#getJobName()
050: */
051: public String getJobName() {
052: return this .importPrimaryPath.toString();
053: }
054:
055: /**
056: * @see dbImporter#getStatus()
057: */
058: public String getStatus() {
059: StringBuffer theStatus = new StringBuffer();
060:
061: theStatus.append("Hash=").append(this .wordHash).append("\n");
062: theStatus.append("#URL=").append(this .urlCounter).append("\n");
063: theStatus.append("#Word Entity=").append(this .wordCounter)
064: .append("\n");
065: theStatus.append("#Word Entry={").append(this .entryCounter);
066: theStatus.append(" ,NotBound=").append(
067: this .notBoundEntryCounter).append("}");
068:
069: return theStatus.toString();
070: }
071:
072: //public void init(File thePrimaryPath, File theSecondaryPath, int theCacheSize, long preloadTime) {
073: /**
074: * @throws ImporterException
075: * @see dbImporter#init(HashMap)
076: */
077: public void init(HashMap<String, String> initParams)
078: throws ImporterException {
079: super .init(initParams);
080:
081: if (initParams == null || initParams.size() == 0)
082: throw new IllegalArgumentException(
083: "Init parameters are missing");
084: if (!initParams.containsKey("primaryPath"))
085: throw new IllegalArgumentException(
086: "Init parameters 'primaryPath' is missing");
087: if (!initParams.containsKey("secondaryPath"))
088: throw new IllegalArgumentException(
089: "Init parameters 'secondaryPath' is missing");
090: if (!initParams.containsKey("cacheSize"))
091: throw new IllegalArgumentException(
092: "Init parameters 'cacheSize' is missing");
093: if (!initParams.containsKey("preloadTime"))
094: throw new IllegalArgumentException(
095: "Init parameters 'preloadTime' is missing");
096:
097: // TODO: we need more errorhandling here
098: this .importPrimaryPath = new File((String) initParams
099: .get("primaryPath"));
100: this .importSecondaryPath = new File((String) initParams
101: .get("secondaryPath"));
102:
103: this .cacheSize = Integer.valueOf(
104: (String) initParams.get("cacheSize")).intValue();
105: if (this .cacheSize < 2 * 1024 * 1024)
106: this .cacheSize = 8 * 1024 * 1024;
107:
108: this .preloadTime = Long.valueOf(
109: (String) initParams.get("preloadTime")).longValue();
110:
111: // configure import DB
112: String errorMsg = null;
113: if (!this .importPrimaryPath.exists())
114: errorMsg = "Primary Import directory does not exist.";
115: if (!this .importPrimaryPath.canRead())
116: errorMsg = "Primary Import directory is not readable.";
117: if (!this .importPrimaryPath.canWrite())
118: errorMsg = "Primary Import directory is not writeable";
119: if (!this .importPrimaryPath.isDirectory())
120: errorMsg = "Primary Import directory is not a directory.";
121: if (errorMsg != null) {
122: this .log.logSevere(errorMsg + "\nName: "
123: + this .importPrimaryPath.getAbsolutePath());
124: throw new IllegalArgumentException(errorMsg);
125: }
126: if (!this .importSecondaryPath.exists())
127: errorMsg = "Secondary Import directory does not exist.";
128: if (!this .importSecondaryPath.canRead())
129: errorMsg = "Secondary Import directory is not readable.";
130: if (!this .importSecondaryPath.canWrite())
131: errorMsg = "Secondary Import directory is not writeable";
132: if (!this .importSecondaryPath.isDirectory())
133: errorMsg = "Secondary Import directory is not a directory.";
134: if (errorMsg != null) {
135: this .log.logSevere(errorMsg + "\nName: "
136: + this .importSecondaryPath.getAbsolutePath());
137: throw new IllegalArgumentException(errorMsg);
138: }
139:
140: this .log.logFine("Initializing source word index db.");
141: this .importWordIndex = new plasmaWordIndex(
142: this .importPrimaryPath, this .importSecondaryPath,
143: preloadTime / 2, this .log);
144:
145: this .importStartSize = this .importWordIndex.size();
146: }
147:
148: public void run() {
149: try {
150: importWordsDB();
151: } finally {
152: this .globalEnd = System.currentTimeMillis();
153: //this.sb.dbImportManager.finishedJobs.add(this);
154: }
155: }
156:
157: /**
158: * @see dbImporter#getProcessingStatusPercent()
159: */
160: public int getProcessingStatusPercent() {
161: // thid seems to be better:
162: // (this.importStartSize-this.importWordIndex.size())*100/((this.importStartSize==0)?1:this.importStartSize);
163: // but maxint (2,147,483,647) could be exceeded when WordIndexes reach 20M entries
164: //return (this.importStartSize-this.importWordIndex.size())/((this.importStartSize<100)?1:(this.importStartSize)/100);
165: return (int) (this .wordCounter)
166: / ((this .importStartSize < 100) ? 1
167: : (this .importStartSize) / 100);
168: }
169:
170: /**
171: * @see dbImporter#getElapsedTime()
172: */
173: public long getEstimatedTime() {
174: return (this .wordCounter == 0) ? 0
175: : ((this .importStartSize * getElapsedTime()) / this .wordCounter)
176: - getElapsedTime();
177: }
178:
179: public void importWordsDB() {
180: this .log.logInfo("STARTING DB-IMPORT");
181:
182: try {
183: this .log.logInfo("Importing DB from '"
184: + this .importPrimaryPath.getAbsolutePath() + "'/'"
185: + this .importSecondaryPath.getAbsolutePath() + "'");
186: this .log.logInfo("Home word index contains "
187: + homeWordIndex.size() + " words and "
188: + homeWordIndex.loadedURL.size() + " URLs.");
189: this .log.logInfo("Import word index contains "
190: + this .importWordIndex.size() + " words and "
191: + this .importWordIndex.loadedURL.size() + " URLs.");
192:
193: HashSet<String> unknownUrlBuffer = new HashSet<String>();
194: HashSet<String> importedUrlBuffer = new HashSet<String>();
195:
196: // iterate over all words from import db
197: //Iterator importWordHashIterator = this.importWordIndex.wordHashes(this.wordChunkStartHash, plasmaWordIndex.RL_WORDFILES, false);
198: Iterator<indexContainer> indexContainerIterator = this .importWordIndex
199: .indexContainerSet(this .wordChunkStartHash, false,
200: false, 100).iterator();
201: while (!isAborted() && indexContainerIterator.hasNext()) {
202:
203: TreeSet<String> entityUrls = new TreeSet<String>();
204: indexContainer newContainer = null;
205: try {
206: this .wordCounter++;
207: newContainer = (indexContainer) indexContainerIterator
208: .next();
209: this .wordHash = newContainer.getWordHash();
210:
211: // loop throug the entities of the container and get the
212: // urlhash
213: Iterator<indexRWIRowEntry> importWordIdxEntries = newContainer
214: .entries();
215: indexRWIEntry importWordIdxEntry;
216: while (importWordIdxEntries.hasNext()) {
217: // testing if import process was aborted
218: if (isAborted())
219: break;
220:
221: // getting next word index entry
222: importWordIdxEntry = (indexRWIEntry) importWordIdxEntries
223: .next();
224: String urlHash = importWordIdxEntry.urlHash();
225: entityUrls.add(urlHash);
226: }
227:
228: Iterator<String> urlIter = entityUrls.iterator();
229: while (urlIter.hasNext()) {
230: if (isAborted())
231: break;
232: String urlHash = urlIter.next();
233:
234: if (importedUrlBuffer.contains(urlHash)) {
235: // already known url
236: } else if (unknownUrlBuffer.contains(urlHash)) {
237: // url known as unknown
238: unknownUrlBuffer.add(urlHash);
239: notBoundEntryCounter++;
240: newContainer.remove(urlHash);
241: continue;
242: } else {
243: // we need to import the url
244:
245: // getting the url entry
246: indexURLEntry urlEntry = this .importWordIndex.loadedURL
247: .load(urlHash, null, 0);
248: if (urlEntry != null) {
249:
250: /* write it into the home url db */
251: homeWordIndex.loadedURL.store(urlEntry);
252: importedUrlBuffer.add(urlHash);
253: this .urlCounter++;
254:
255: if (this .urlCounter % 500 == 0) {
256: this .log
257: .logFine(this .urlCounter
258: + " URLs processed so far.");
259: }
260:
261: } else {
262: unknownUrlBuffer.add(urlHash);
263: notBoundEntryCounter++;
264: newContainer.remove(urlHash);
265: continue;
266: }
267: }
268: this .entryCounter++;
269: }
270:
271: // testing if import process was aborted
272: if (isAborted())
273: break;
274:
275: // importing entity container to home db
276: if (newContainer.size() > 0) {
277: homeWordIndex.addEntries(newContainer);
278: }
279:
280: // delete complete index entity file
281: this .importWordIndex.deleteContainer(this .wordHash);
282:
283: // print out some statistical information
284: if (this .entryCounter % 500 == 0) {
285: this .log.logFine(this .entryCounter
286: + " word entries and "
287: + this .wordCounter
288: + " word entities processed so far.");
289: }
290:
291: if (this .wordCounter % 500 == 0) {
292: this .wordChunkEndHash = this .wordHash;
293: this .wordChunkEnd = System.currentTimeMillis();
294: long duration = this .wordChunkEnd
295: - this .wordChunkStart;
296: this .log
297: .logInfo(this .wordCounter
298: + " word entities imported "
299: + "["
300: + this .wordChunkStartHash
301: + " .. "
302: + this .wordChunkEndHash
303: + "] "
304: + this
305: .getProcessingStatusPercent()
306: + "%\n"
307: + "Speed: "
308: + 500
309: * 1000
310: / duration
311: + " word entities/s"
312: + " | Elapsed time: "
313: + serverDate
314: .formatInterval(getElapsedTime())
315: + " | Estimated time: "
316: + serverDate
317: .formatInterval(getEstimatedTime())
318: + "\n" + "Home Words = "
319: + homeWordIndex.size()
320: + " | Import Words = "
321: + this .importWordIndex.size());
322: this .wordChunkStart = this .wordChunkEnd;
323: this .wordChunkStartHash = this .wordChunkEndHash;
324: }
325:
326: } catch (Exception e) {
327: this .log.logSevere("Import of word entity '"
328: + this .wordHash + "' failed.", e);
329: } finally {
330: if (newContainer != null)
331: newContainer.clear();
332: }
333:
334: if (!indexContainerIterator.hasNext()) {
335: // We may not be finished yet, try to get the next chunk of wordHashes
336: TreeSet<indexContainer> containers = this .importWordIndex
337: .indexContainerSet(this .wordHash, false,
338: false, 100);
339: indexContainerIterator = containers.iterator();
340: // Make sure we don't get the same wordhash twice, but don't skip a word
341: if ((indexContainerIterator.hasNext())
342: && (!this .wordHash
343: .equals(((indexContainer) indexContainerIterator
344: .next()).getWordHash()))) {
345: indexContainerIterator = containers.iterator();
346: }
347: }
348: }
349:
350: this .log.logInfo("Home word index contains "
351: + homeWordIndex.size() + " words and "
352: + homeWordIndex.loadedURL.size() + " URLs.");
353: this .log.logInfo("Import word index contains "
354: + this .importWordIndex.size() + " words and "
355: + this .importWordIndex.loadedURL.size() + " URLs.");
356: } catch (Exception e) {
357: this .log.logSevere("Database import failed.", e);
358: e.printStackTrace();
359: this .error = e.toString();
360: } finally {
361: this .log.logInfo("Import process finished.");
362: if (this .importWordIndex != null)
363: try {
364: this .importWordIndex.close();
365: } catch (Exception e) {
366: }
367: }
368: }
369:
370: }
|