001: package bdd.search.spider;
002:
003: import java.io.File;
004: import java.io.InputStream;
005: import java.io.OutputStream;
006: import java.io.DataInputStream;
007: import java.io.DataOutputStream;
008: import java.io.BufferedOutputStream;
009: import java.io.FileInputStream;
010: import java.io.FileOutputStream;
011: import java.io.IOException;
012: import java.net.URL;
013: import bdd.util.FIFOQueue;
014: import bdd.search.EnginePrefs;
015: import bdd.search.Monitor;
016:
017: /** Written by Tim Macinta 1997 <br>
018: * Distributed under the GNU Public License
019: * (a copy of which is enclosed with the source). <br>
020: * <br>
021: * The Indexer is a thread which can index URLs that have been
022: * cached using the URLStatus class. Use the queueURL() method
023: * to add cached URLs to the Indexer's list of URLs. Once the
024: * start() method is called, the Indexer will start processing
025: * URLs in its queue. More URLs can also be added after calling
026: * start, in fact this may be the best way to use the Indexer.
027: * Calling the stopWhenDone() method will cause the Indexer
028: * thread to stop as soon as its queue empties.
029: */
030:
031: public class Indexer extends Thread {
032:
033: File working_dir; // a temporary working directory
034: FIFOQueue q = new FIFOQueue(); // queue for cached URLs
035: Object q_mutex = new Object(); // synchronization mutex
036: boolean running = false; // set to false when the Thread should stop
037: Crawler crawler; // the crawler that retrieves URLs
038: EnginePrefs prefs; // preferences
039: boolean exit_when_done = false;// exit when done indexing?
040: long total_bytes = 0; // total number of bytes indexed
041:
042: // names for temporary files
043:
044: static final String TMP_NAME = "tmp.db";
045: static final String TMP_NAME_2 = "tmp2.db";
046:
047: /** "working_dir" should be a directory that only this
048: * Indexer and a given Cralwer will be
049: * accessing. This means that if several Indexers are running
050: * simultaneously, they should all be given different "working_dir"
051: * directories. Also, no other threads should write to this
052: * directory (except for the selected Crawler).
053: */
054: public Indexer(File working_dir, Crawler crawler, EnginePrefs prefs) {
055: this .working_dir = working_dir;
056: this .crawler = crawler;
057: this .prefs = prefs;
058: cleanUp(); // remove all temporary files
059: }
060:
061: /** Use this method to add a cached url to the Indexer. */
062: public void queueURL(URLStatus url) {
063: if (url == null)
064: return;
065: synchronized (q_mutex) {
066: q.addElement(url);
067: }
068: }
069:
070: /** Starts the Indexer. */
071: public void start() {
072: running = true;
073: super .start();
074: }
075:
076: /** This is where the actual indexing is done. */
077: public void run() {
078:
079: URLStatus url;
080: while (q.hasMoreElements() || running) {
081: if (q.hasMoreElements()) {
082: synchronized (q_mutex) {
083: url = (URLStatus) q.readNextElement();
084: }
085: try {
086: total_bytes += url.getContentLength();
087: Monitor m = prefs.getMonitor();
088: if (m != null)
089: m.bytesIndexed(total_bytes);
090: File db = new File(working_dir, TMP_NAME);
091: DataOutputStream out = new DataOutputStream(
092: new BufferedOutputStream(
093: new FileOutputStream(db)));
094: url.dumpToDatabase(out);
095: out.flush();
096: out.close();
097: addNewURLs(url.getLinkExtractor());
098: synchronized (q_mutex) {
099: q.nextElement();
100: }
101: mergeDatabases(db);
102: } catch (IOException e) {
103: e.printStackTrace();
104: }
105: } else {
106:
107: // Nothing in queue so sleep for a few seconds
108:
109: try {
110: Thread.sleep(5000);
111: } catch (InterruptedException e) {
112: }
113: }
114: }
115: try {
116: replaceMainIndex(); // replace main index
117: } catch (IOException e) {
118: e.printStackTrace();
119: }
120: System.gc();
121: cleanUp(); // remove all temporary files
122: if (exit_when_done) {
123: System.exit(0);
124: }
125: }
126:
127: /** Causes this Indexer to stop whenever it finishes indexing the URLs
128: * in its queue. */
129: public void stopWhenDone(boolean exit_when_done) {
130: running = false;
131: this .exit_when_done = exit_when_done;
132: }
133:
134: /** Removes all the ".db" and ".tmp" files in the directory "working_dir". */
135: void cleanUp() {
136: String[] files = working_dir.list();
137: for (int i = 0; i < files.length; i++) {
138: if (files[i].endsWith(".db") || files[i].endsWith(".tmp")) {
139: new File(working_dir, files[i]).delete();
140: }
141: }
142: }
143:
144: /** Repeatedly attempts to merge "temporary" with other temporary
145: * databases which have been merged the same number of times. In other
146: * words, this method will first try to merge "temporary" with any
147: * databases that haven't been merged yet. If that is successful,
148: * this database will then be merged with any databases that have been
149: * merged once. If that is successful, this database will then be
150: * merged with any databases that have been merged twice... and
151: * so on and so forth.
152: * <p>
153: * Databases are named based on the number of times they have been
154: * merged. E.g., a file called "6.db" will have been merged six times while
155: * a file called "9.db" will have been merged nine times. It is assumed
156: * that the "temporary" file has not been merged at all.
157: */
158: void mergeDatabases(File temporary) throws IOException {
159: int i = 0;
160: File f = new File(working_dir, i + ".db");
161: File tmp2 = new File(working_dir, TMP_NAME_2);
162: while (f.exists()) {
163: merge(temporary, f, tmp2);
164: temporary.delete();
165: f.delete();
166: tmp2.renameTo(temporary);
167: i++;
168: f = new File(working_dir, i + ".db");
169: }
170: temporary.renameTo(f);
171: }
172:
173: /** Adds new URLs to the crawler's queue. */
174: void addNewURLs(LinkExtractor urls) {
175: while (urls.hasMoreElements()) {
176: crawler.addURL((URL) urls.nextElement());
177: }
178: }
179:
180: /** Completes the merging of all temporary databases and replaces the
181: * main database with the final product.
182: */
183: void replaceMainIndex() throws IOException {
184:
185: // merge all existing databases
186:
187: String[] files = working_dir.list();
188: int targ;
189: boolean good;
190: int count = 0;
191: File tmp1 = new File(working_dir, TMP_NAME);
192: File tmp2 = new File(working_dir, TMP_NAME_2);
193: for (int i = files.length - 1; i >= 0; i--) {
194: if (files[i].endsWith(".db")) {
195: targ = files[i].length() - 3;
196: if (targ > 0) {
197: good = true;
198: for (int x = 0; x < targ; x++) {
199: if (!Character.isDigit(files[i].charAt(x))) {
200: good = false;
201: break;
202: }
203: }
204: if (good) {
205: count++;
206: File f = new File(working_dir, files[i]);
207: merge(tmp1, f, tmp2);
208: tmp1.delete();
209: tmp2.renameTo(tmp1);
210: f.delete();
211: }
212: }
213: }
214: }
215:
216: // replace main database
217:
218: if (count < 1)
219: return;
220: File main_index = prefs.getMainIndex();
221: main_index.delete();
222: tmp1.renameTo(main_index);
223: }
224:
225: /** Takes two search databases, "file1" and "file2", and merges their
226: * contents with the results being placed in "target". "file2" must
227: * exist, but "file1" need not. If "file1" does not exist then
228: * "file2" is copied to "target".
229: */
230: void merge(File file1, File file2, File target) throws IOException {
231:
232: // copy file2 if file1 doesn't exist
233:
234: if (!file1.exists()) {
235: OutputStream out = new BufferedOutputStream(
236: new FileOutputStream(target));
237: pipe(new FileInputStream(file2), out);
238: out.flush();
239: out.close();
240: return;
241: }
242:
243: // file1 does exist
244:
245: DataInputStream in1 = new DataInputStream(new FileInputStream(
246: file1));
247: DataInputStream in2 = new DataInputStream(new FileInputStream(
248: file2));
249: DataOutputStream out = new DataOutputStream(
250: new BufferedOutputStream(new FileOutputStream(target)));
251:
252: // merge headers
253:
254: out.writeLong(in1.readLong() + in2.readLong() - 8 - 4); // pointer to words
255: int offset1 = in1.readInt();
256: int offset2 = in2.readInt();
257: out.writeInt(offset1 + offset2); // number of URLs
258:
259: // dump URL descriptions
260:
261: for (int i = 0; i < offset1; i++) {
262: out.writeBytes(in1.readLine() + '\n');
263: }
264: for (int i = 0; i < offset2; i++) {
265: out.writeBytes(in2.readLine() + '\n');
266: }
267:
268: // dump actual words
269:
270: int url_num;
271: String word1 = in1.readLine();
272: String word2 = in2.readLine();
273: while (word1 != null || word2 != null) {
274: if (word1 == null) {
275:
276: // words from "file1" are done
277:
278: while (word2 != null) {
279: out.writeBytes(word2 + '\n');
280: while (true) {
281: url_num = in2.readInt();
282: if (url_num == 0)
283: break;
284: out.writeInt(offset1 + url_num);
285: out.write(in2.read());
286: }
287: out.writeInt(0);
288: word2 = in2.readLine();
289: }
290: } else if (word2 == null) {
291:
292: // words from "file2" are done
293:
294: out.writeBytes(word1 + '\n');
295: pipe(in1, out);
296: word1 = null;
297: } else {
298:
299: // still merging both files
300:
301: int compare = word1.compareTo(word2);
302: if (compare == 0) {
303:
304: // same word in both databases
305:
306: int u1 = in1.readInt();
307: int u2 = in2.readInt();
308: out.writeBytes(word1 + '\n');
309: while (u1 != 0 || u2 != 0) {
310: if (u1 == 0) {
311:
312: // word1 is done
313:
314: while (u2 != 0) {
315: out.writeInt(u2 + offset1);
316: out.write(in2.read());
317: u2 = in2.readInt();
318: }
319: } else if (u2 == 0) {
320:
321: // word2 is done
322:
323: while (u1 != 0) {
324: out.writeInt(u1);
325: out.write(in1.read());
326: u1 = in1.readInt();
327: }
328: } else {
329:
330: // neither is done yet
331:
332: int s1 = ((int) in1.read()) & 0xff;
333: int s2 = ((int) in2.read()) & 0xff;
334: while (true) {
335: if (s1 > s2) {
336:
337: // the URL in file1 has a higher score
338:
339: out.writeInt(u1);
340: out.write(s1);
341: u1 = in1.readInt();
342: if (u1 == 0) {
343: out.writeInt(u2 + offset1);
344: out.write(s2);
345: u2 = in2.readInt();
346: break;
347: } else {
348: s1 = ((int) in1.read()) & 0xff;
349: }
350: } else {
351:
352: // the URL in file2 has a higher score
353:
354: out.writeInt(u2 + offset1);
355: out.write(s2);
356: u2 = in2.readInt();
357: if (u2 == 0) {
358: out.writeInt(u1);
359: out.write(s1);
360: u1 = in1.readInt();
361: break;
362: } else {
363: s2 = ((int) in2.read()) & 0xff;
364: }
365: }
366: }
367: }
368: }
369: out.writeInt(0);
370: word1 = in1.readLine();
371: word2 = in2.readLine();
372: } else if (compare > 0) {
373:
374: // dump word2
375:
376: out.writeBytes(word2 + '\n');
377: while (true) {
378: url_num = in2.readInt();
379: if (url_num == 0)
380: break;
381: out.writeInt(offset1 + url_num);
382: out.write(in2.read());
383: }
384: out.writeInt(0);
385: word2 = in2.readLine();
386: } else { // compare < 0
387:
388: // dump word1
389:
390: out.writeBytes(word1 + '\n');
391: while (true) {
392: url_num = in1.readInt();
393: out.writeInt(url_num);
394: if (url_num == 0)
395: break;
396: out.write(in1.read());
397: }
398: word1 = in1.readLine();
399: }
400: }
401: }
402:
403: // close all files
404:
405: out.flush();
406: in1.close();
407: in2.close();
408: out.close();
409: }
410:
411: /** Pipes "in" to "out" until "in" is exhausted then closes "in". */
412: void pipe(InputStream in, OutputStream out) throws IOException {
413: byte[] b = new byte[512];
414: int x = in.read(b, 0, b.length);
415: while (x > 0) {
416: out.write(b, 0, x);
417: x = in.read(b, 0, b.length);
418: }
419: in.close();
420: }
421:
422: }
|