001: /*
002: * Copyright 2005 by Lars Torunski
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: *
016: */
017: package com.torunski.crawler;
018:
019: import java.util.Collection;
020:
021: import org.apache.commons.logging.Log;
022: import org.apache.commons.logging.LogFactory;
023:
024: import com.torunski.crawler.core.AbstractCrawler;
025: import com.torunski.crawler.link.Link;
026: import com.torunski.crawler.parser.PageData;
027: import com.torunski.crawler.util.StopWatch;
028:
029: import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
030: import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
031: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
032:
033: /**
034: * Project: Smart & Simple Web Crawler
035: *
036: * Crawls through the web with a lot of threads.
037: *
038: * @author Lars Torunski
039: * @version $Revision: 1.8 $
040: */
041: public class MultiThreadedCrawler extends AbstractCrawler {
042:
043: private static final transient Log log = LogFactory
044: .getLog(MultiThreadedCrawler.class);
045:
046: private final Object UPDATE_LOCK = new Object();
047:
048: private ThreadPoolExecutor loadService;
049: private ThreadPoolExecutor parseService;
050:
051: private int threadsToBeFinished;
052:
053: /**
054: * Default constructor for 5 loading threads and 2 parsing threads.
055: */
056: public MultiThreadedCrawler() {
057: this (5, 2);
058: }
059:
060: /**
061: * Creates a multi threaded crawler which delegates the load and parse tasks to different threads.
062: * Per RFC 2616 sec 8.1.4 the maximum number of connections allowed per host is 2.
063: * @see org.apache.commons.httpclient.MultiThreadedHttpConnectionManager#DEFAULT_MAX_TOTAL_CONNECTIONS
064: * @param maxLoadThreads maximum of threads for loading the content
065: * @param maxParseThreads maximum of threads for parsing the download
066: */
067: public MultiThreadedCrawler(int maxLoadThreads, int maxParseThreads) {
068: loadService = new ThreadPoolExecutor(maxLoadThreads,
069: maxLoadThreads, 0L, TimeUnit.MILLISECONDS,
070: new LinkedBlockingQueue());
071: parseService = new ThreadPoolExecutor(maxParseThreads,
072: maxParseThreads, 0L, TimeUnit.MILLISECONDS,
073: new LinkedBlockingQueue());
074: }
075:
076: /**
077: * Starts the crawling process in a multi threaded enviroment.
078: */
079: public void start(String server, String start) {
080:
081: // set the default parser
082: if (parser == null) {
083: log
084: .debug("No parser set, defautling to SimpleHttpClientParser.");
085: parser = new com.torunski.crawler.parser.httpclient.SimpleHttpClientParser(
086: true);
087: }
088:
089: // set default crawler model
090: if (model == null) {
091: log.debug("No model set, defautling to MaxDepthModel.");
092: // TODO remove parameter!
093: model = new com.torunski.crawler.model.MaxDepthModel(1);
094: }
095:
096: // initialize stop watch
097: StopWatch total = new StopWatch();
098: total.start();
099:
100: // add at least one link to the list
101: model.add(null, server + start);
102:
103: // starts the crawling process
104: start();
105:
106: total.stop();
107:
108: // output some statistics
109: if (log.isInfoEnabled()) {
110:
111: Collection visitedURIs = model.getVisitedURIs();
112: Collection toVisitURIs = model.getToVisitURIs();
113:
114: log.info("Visited URIs: " + visitedURIs.size());
115:
116: if (toVisitURIs.size() > 0) {
117: log.warn("still URIs to be visited, at least: "
118: + toVisitURIs.size());
119: }
120:
121: // output stop watch data
122: log.info("Total time: " + total.getTime() + " ms");
123: }
124: }
125:
126: /**
127: * Starts the crawling process in a multi threaded enviroment.
128: * @see com.torunski.crawler.core.ICrawler#start()
129: */
130: public void start() {
131: // loop until there aren't any URIs anymore
132: while (!isFinished()) {
133:
134: synchronized (UPDATE_LOCK) {
135: if (!model.isEmpty()) {
136: // remove a link from the stack
137: Link link = model.pop();
138: loadService.execute(new LoadTask(link));
139: }
140: }
141:
142: try {
143: if (model.isEmpty()) {
144: Thread.sleep(100);
145: }
146: } catch (InterruptedException e) {
147: log.info("Sleep of " + this .getClass().getName()
148: + " interrupted", e);
149: }
150:
151: }
152:
153: loadService.shutdown();
154: parseService.shutdown();
155: }
156:
157: private boolean isFinished() {
158: synchronized (UPDATE_LOCK) {
159: // FIXME find a better way to check if there aren't any threads left
160: return model.isEmpty() && (threadsToBeFinished == 0);
161: }
162: }
163:
164: private class LoadTask implements Runnable {
165:
166: private final Link link;
167:
168: public LoadTask(Link link) {
169: threadsToBeFinished++;
170: this .link = link;
171: log.debug("LoadTask created for link: " + link.getURI());
172: }
173:
174: public void run() {
175: log.debug("LoadTask running for link: " + link.getURI());
176:
177: try {
178: // load the page of the link
179: PageData pageData = parser.load(link);
180:
181: if (pageData.getStatus() == PageData.OK) {
182: // no sync needed because we are still running
183: parseService.execute(new ParseTask(link, pageData));
184: }
185: } finally {
186: threadsToBeFinished--;
187: }
188:
189: log.debug("LoadTask finished for link: " + link.getURI());
190: }
191:
192: }
193:
194: private class ParseTask implements Runnable {
195:
196: private final Link link;
197: private final PageData pageData;
198:
199: public ParseTask(Link link, PageData pageData) {
200: threadsToBeFinished++;
201: this .link = link;
202: this .pageData = pageData;
203: log.debug("ParseTask created for link: " + link.getURI());
204: }
205:
206: public void run() {
207: log.debug("ParseTask running for link: " + link.getURI());
208:
209: try {
210: // get the links in the page
211: Collection newURIs = parser.parse(pageData, linkFilter);
212:
213: // update model
214: synchronized (UPDATE_LOCK) {
215: fireParserEvent(link, pageData, newURIs);
216:
217: // remove already visited URIs from the new URI list
218: newURIs.removeAll(model.getVisitedURIs());
219:
220: // the rest of the URIs can be visited
221: model.add(link, newURIs);
222: }
223: } finally {
224: threadsToBeFinished--;
225: }
226:
227: log.debug("ParseTask finished for link: " + link.getURI());
228: }
229:
230: }
231:
232: }
|