001: package de.anomic.plasma.dbImport;
002:
003: import java.io.File;
004: import java.io.IOException;
005: import java.util.HashMap;
006: import java.util.HashSet;
007: import java.util.Iterator;
008:
009: import de.anomic.plasma.plasmaCrawlEntry;
010: import de.anomic.plasma.plasmaCrawlNURL;
011: import de.anomic.plasma.plasmaCrawlProfile;
012: import de.anomic.plasma.plasmaSwitchboard;
013:
014: public class plasmaCrawlNURLImporter extends AbstractImporter implements
015: dbImporter {
016:
017: private File plasmaPath = null;
018: private HashSet<String> importProfileHandleCache = new HashSet<String>();
019: private plasmaCrawlProfile importProfileDB;
020: private plasmaCrawlNURL importNurlDB;
021: private int importStartSize;
022: private int urlCount = 0;
023: private int profileCount = 0;
024:
025: public plasmaCrawlNURLImporter(plasmaSwitchboard theSb) {
026: super ("NURL", theSb);
027: }
028:
029: public long getEstimatedTime() {
030: return (this .urlCount == 0) ? 0
031: : ((this .importStartSize * getElapsedTime()) / (this .urlCount))
032: - getElapsedTime();
033: }
034:
035: public String getJobName() {
036: return this .plasmaPath.toString();
037: }
038:
039: public int getProcessingStatusPercent() {
040: return (this .urlCount)
041: / ((this .importStartSize < 100) ? 1
042: : (this .importStartSize) / 100);
043: }
044:
045: public String getStatus() {
046: StringBuffer theStatus = new StringBuffer();
047:
048: theStatus.append("#URLs=").append(this .urlCount).append("\n");
049: theStatus.append("#Profiles=").append(this .profileCount);
050:
051: return theStatus.toString();
052: }
053:
054: public void init(HashMap<String, String> initParams)
055: throws ImporterException {
056: super .init(initParams);
057:
058: if (initParams == null || initParams.size() == 0)
059: throw new IllegalArgumentException(
060: "Init parameters are missing");
061: if (!initParams.containsKey("plasmaPath"))
062: throw new IllegalArgumentException(
063: "Init parameters 'plasmaPath' is missing");
064: if (!initParams.containsKey("cacheSize"))
065: throw new IllegalArgumentException(
066: "Init parameters 'cacheSize' is missing");
067: if (!initParams.containsKey("preloadTime"))
068: throw new IllegalArgumentException(
069: "Init parameters 'preloadTime' is missing");
070:
071: // TODO: we need more errorhandling here
072: this .plasmaPath = new File((String) initParams
073: .get("plasmaPath"));
074:
075: this .cacheSize = Integer.valueOf(
076: (String) initParams.get("cacheSize")).intValue();
077: if (this .cacheSize < 2 * 1024 * 1024)
078: this .cacheSize = 8 * 1024 * 1024;
079:
080: this .preloadTime = Long.valueOf(
081: (String) initParams.get("preloadTime")).longValue();
082:
083: File noticeUrlDbFile = new File(plasmaPath, "urlNotice1.db");
084: File profileDbFile = new File(plasmaPath,
085: plasmaSwitchboard.DBFILE_ACTIVE_CRAWL_PROFILES);
086:
087: String errorMsg = null;
088: if (!plasmaPath.exists())
089: errorMsg = "The import path '" + plasmaPath
090: + "' does not exist.";
091: else if (!plasmaPath.isDirectory())
092: errorMsg = "The import path '" + plasmaPath
093: + "' is not a directory.";
094: else if (!plasmaPath.canRead())
095: errorMsg = "The import path '" + plasmaPath
096: + "' is not readable.";
097: else if (!plasmaPath.canWrite())
098: errorMsg = "The import path '" + plasmaPath
099: + "' is not writeable.";
100:
101: else if (!noticeUrlDbFile.exists())
102: errorMsg = "The noticeUrlDB file '" + noticeUrlDbFile
103: + "' does not exist.";
104: else if (noticeUrlDbFile.isDirectory())
105: errorMsg = "The noticeUrlDB file '" + noticeUrlDbFile
106: + "' is not a file.";
107: else if (!noticeUrlDbFile.canRead())
108: errorMsg = "The noticeUrlDB file '" + noticeUrlDbFile
109: + "' is not readable.";
110: else if (!noticeUrlDbFile.canWrite())
111: errorMsg = "The noticeUrlDB file '" + noticeUrlDbFile
112: + "' is not writeable.";
113:
114: else if (!profileDbFile.exists())
115: errorMsg = "The profileDB file '" + profileDbFile
116: + "' does not exist.";
117: else if (profileDbFile.isDirectory())
118: errorMsg = "The profileDB file '" + profileDbFile
119: + "' is not a file.";
120: else if (!profileDbFile.canRead())
121: errorMsg = "The profileDB file '" + profileDbFile
122: + "' is not readable.";
123: // else if (!profileDbFile.canWrite())
124: // errorMsg = "The profileDB file '" + profileDbFile + "' is not writeable.";
125:
126: if (errorMsg != null) {
127: this .log.logSevere(errorMsg);
128: throw new IllegalArgumentException(errorMsg);
129: }
130:
131: // init noticeUrlDB
132: this .log.logInfo("Initializing the source noticeUrlDB");
133: this .importNurlDB = new plasmaCrawlNURL(plasmaPath);
134: this .importStartSize = this .importNurlDB.size();
135: //int stackSize = this.importNurlDB.stackSize();
136:
137: // init profile DB
138: this .log.logInfo("Initializing the source profileDB");
139: this .importProfileDB = new plasmaCrawlProfile(profileDbFile,
140: 300);
141: }
142:
143: @SuppressWarnings("unchecked")
144: public void run() {
145: try {
146: // waiting on init thread to finish
147: //this.importNurlDB.waitOnInitThread();
148:
149: // the stack types we want to import
150: int[] stackTypes = new int[] {
151: plasmaCrawlNURL.STACK_TYPE_CORE,
152: plasmaCrawlNURL.STACK_TYPE_LIMIT,
153: plasmaCrawlNURL.STACK_TYPE_REMOTE, -1 };
154:
155: // looping through the various stacks
156: for (int stackType = 0; stackType < stackTypes.length; stackType++) {
157: if (stackTypes[stackType] != -1) {
158: this .log.logInfo("Starting to import stacktype '"
159: + stackTypes[stackType]
160: + "' containing '"
161: + this .importNurlDB
162: .stackSize(stackTypes[stackType])
163: + "' entries.");
164: } else {
165: this .log.logInfo("Starting to import '"
166: + this .importNurlDB.size()
167: + "' entries not available in any stack.");
168: }
169:
170: // getting an iterator and loop through the URL entries
171: Iterator<plasmaCrawlEntry> entryIter = (stackTypes[stackType] == -1) ? this .importNurlDB
172: .iterator(stackType)
173: : null;
174: while (true) {
175:
176: String nextHash = null;
177: plasmaCrawlEntry nextEntry = null;
178:
179: try {
180: if (stackTypes[stackType] != -1) {
181: if (this .importNurlDB
182: .stackSize(stackTypes[stackType]) == 0)
183: break;
184:
185: this .urlCount++;
186: nextEntry = this .importNurlDB.pop(
187: stackTypes[stackType], false);
188: nextHash = nextEntry.url().hash();
189: } else {
190: if (!entryIter.hasNext())
191: break;
192:
193: this .urlCount++;
194: nextEntry = entryIter.next();
195: nextHash = nextEntry.url().hash();
196: }
197: } catch (IOException e) {
198: this .log.logWarning("Unable to import entry: "
199: + e.toString());
200:
201: if ((stackTypes[stackType] != -1)
202: && (this .importNurlDB
203: .stackSize(stackTypes[stackType]) == 0))
204: break;
205: continue;
206: }
207:
208: // getting a handler to the crawling profile the url belongs to
209: try {
210: String profileHandle = nextEntry
211: .profileHandle();
212: if (profileHandle == null) {
213: this .log
214: .logWarning("Profile handle of url entry '"
215: + nextHash + "' unknown.");
216: continue;
217: }
218:
219: // if we havn't imported the profile until yet we need to do it now
220: if (!this .importProfileHandleCache
221: .contains(profileHandle)) {
222:
223: // testing if the profile is already known
224: plasmaCrawlProfile.entry profileEntry = this .sb.profilesActiveCrawls
225: .getEntry(profileHandle);
226:
227: // if not we need to import it
228: if (profileEntry == null) {
229: // copy and store the source profile entry into the destination db
230: plasmaCrawlProfile.entry sourceEntry = this .importProfileDB
231: .getEntry(profileHandle);
232: if (sourceEntry != null) {
233: this .profileCount++;
234: this .importProfileHandleCache
235: .add(profileHandle);
236: this .sb.profilesActiveCrawls
237: .newEntry((HashMap<String, String>) sourceEntry
238: .map().clone());
239: } else {
240: this .log.logWarning("Profile '"
241: + profileHandle
242: + "' of url entry '"
243: + nextHash + "' unknown.");
244: continue;
245: }
246: }
247: }
248:
249: // if the url does not alredy exists in the destination stack we insert it now
250: if (!this .sb.crawlQueues.noticeURL
251: .existsInStack(nextHash)) {
252: this .sb.crawlQueues.noticeURL
253: .push(
254: (stackTypes[stackType] != -1) ? stackTypes[stackType]
255: : plasmaCrawlNURL.STACK_TYPE_CORE,
256: nextEntry);
257: }
258:
259: // removing hash from the import db
260: } finally {
261: this .importNurlDB.removeByURLHash(nextHash);
262: }
263:
264: if (this .urlCount % 100 == 0) {
265: this .log
266: .logFine(this .urlCount
267: + " URLs and '"
268: + this .profileCount
269: + "' profile entries processed so far.");
270: }
271: if (this .isAborted())
272: break;
273: }
274: this .log.logInfo("Finished to import stacktype '"
275: + stackTypes[stackType] + "'");
276: }
277:
278: //int size = this.importNurlDB.size();
279: //int stackSize = this.importNurlDB.stackSize();
280:
281: // TODO: what todo with nurlDB entries that do not exist in any stack?
282:
283: } catch (Exception e) {
284: this .error = e.toString();
285: this .log.logSevere("Import process had detected an error",
286: e);
287: } finally {
288: this .log.logInfo("Import process finished.");
289: this.globalEnd = System.currentTimeMillis();
290: this.sb.dbImportManager.finishedJobs.add(this);
291: this.importNurlDB.close();
292: this.importProfileDB.close();
293: }
294: }
295:
296: }
|