001: package org.apache.lucene.search;
002:
003: /**
004: * Licensed to the Apache Software Foundation (ASF) under one or more
005: * contributor license agreements. See the NOTICE file distributed with
006: * this work for additional information regarding copyright ownership.
007: * The ASF licenses this file to You under the Apache License, Version 2.0
008: * (the "License"); you may not use this file except in compliance with
009: * the License. You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: */
019:
020: import java.io.IOException;
021:
022: import org.apache.lucene.index.Term;
023: import org.apache.lucene.util.PriorityQueue;
024:
025: /** Implements parallel search over a set of <code>Searchables</code>.
026: *
027: * <p>Applications usually need only call the inherited {@link #search(Query)}
028: * or {@link #search(Query,Filter)} methods.
029: */
030: public class ParallelMultiSearcher extends MultiSearcher {
031:
032: private Searchable[] searchables;
033: private int[] starts;
034:
035: /** Creates a searcher which searches <i>searchables</i>. */
036: public ParallelMultiSearcher(Searchable[] searchables)
037: throws IOException {
038: super (searchables);
039: this .searchables = searchables;
040: this .starts = getStarts();
041: }
042:
043: /**
044: * TODO: parallelize this one too
045: */
046: public int docFreq(Term term) throws IOException {
047: return super .docFreq(term);
048: }
049:
050: /**
051: * A search implementation which spans a new thread for each
052: * Searchable, waits for each search to complete and merge
053: * the results back together.
054: */
055: public TopDocs search(Weight weight, Filter filter, int nDocs)
056: throws IOException {
057: HitQueue hq = new HitQueue(nDocs);
058: int totalHits = 0;
059: MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length];
060: for (int i = 0; i < searchables.length; i++) { // search each searcher
061: // Assume not too many searchables and cost of creating a thread is by far inferior to a search
062: msta[i] = new MultiSearcherThread(searchables[i], weight,
063: filter, nDocs, hq, i, starts,
064: "MultiSearcher thread #" + (i + 1));
065: msta[i].start();
066: }
067:
068: for (int i = 0; i < searchables.length; i++) {
069: try {
070: msta[i].join();
071: } catch (InterruptedException ie) {
072: ; // TODO: what should we do with this???
073: }
074: IOException ioe = msta[i].getIOException();
075: if (ioe == null) {
076: totalHits += msta[i].hits();
077: } else {
078: // if one search produced an IOException, rethrow it
079: throw ioe;
080: }
081: }
082:
083: ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
084: for (int i = hq.size() - 1; i >= 0; i--)
085: // put docs in array
086: scoreDocs[i] = (ScoreDoc) hq.pop();
087:
088: float maxScore = (totalHits == 0) ? Float.NEGATIVE_INFINITY
089: : scoreDocs[0].score;
090:
091: return new TopDocs(totalHits, scoreDocs, maxScore);
092: }
093:
094: /**
095: * A search implementation allowing sorting which spans a new thread for each
096: * Searchable, waits for each search to complete and merges
097: * the results back together.
098: */
099: public TopFieldDocs search(Weight weight, Filter filter, int nDocs,
100: Sort sort) throws IOException {
101: // don't specify the fields - we'll wait to do this until we get results
102: FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(null,
103: nDocs);
104: int totalHits = 0;
105: MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length];
106: for (int i = 0; i < searchables.length; i++) { // search each searcher
107: // Assume not too many searchables and cost of creating a thread is by far inferior to a search
108: msta[i] = new MultiSearcherThread(searchables[i], weight,
109: filter, nDocs, hq, sort, i, starts,
110: "MultiSearcher thread #" + (i + 1));
111: msta[i].start();
112: }
113:
114: float maxScore = Float.NEGATIVE_INFINITY;
115:
116: for (int i = 0; i < searchables.length; i++) {
117: try {
118: msta[i].join();
119: } catch (InterruptedException ie) {
120: ; // TODO: what should we do with this???
121: }
122: IOException ioe = msta[i].getIOException();
123: if (ioe == null) {
124: totalHits += msta[i].hits();
125: maxScore = Math.max(maxScore, msta[i].getMaxScore());
126: } else {
127: // if one search produced an IOException, rethrow it
128: throw ioe;
129: }
130: }
131:
132: ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
133: for (int i = hq.size() - 1; i >= 0; i--)
134: // put docs in array
135: scoreDocs[i] = (ScoreDoc) hq.pop();
136:
137: return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(),
138: maxScore);
139: }
140:
141: /** Lower-level search API.
142: *
143: * <p>{@link HitCollector#collect(int,float)} is called for every non-zero
144: * scoring document.
145: *
146: * <p>Applications should only use this if they need <i>all</i> of the
147: * matching documents. The high-level search API ({@link
148: * Searcher#search(Query)}) is usually more efficient, as it skips
149: * non-high-scoring hits.
150: *
151: * @param weight to match documents
152: * @param filter if non-null, a bitset used to eliminate some documents
153: * @param results to receive hits
154: *
155: * @todo parallelize this one too
156: */
157: public void search(Weight weight, Filter filter,
158: final HitCollector results) throws IOException {
159: for (int i = 0; i < searchables.length; i++) {
160:
161: final int start = starts[i];
162:
163: searchables[i].search(weight, filter, new HitCollector() {
164: public void collect(int doc, float score) {
165: results.collect(doc + start, score);
166: }
167: });
168:
169: }
170: }
171:
172: /*
173: * TODO: this one could be parallelized too
174: * @see org.apache.lucene.search.Searchable#rewrite(org.apache.lucene.search.Query)
175: */
176: public Query rewrite(Query original) throws IOException {
177: return super .rewrite(original);
178: }
179:
180: }
181:
182: /**
183: * A thread subclass for searching a single searchable
184: */
185: class MultiSearcherThread extends Thread {
186:
187: private Searchable searchable;
188: private Weight weight;
189: private Filter filter;
190: private int nDocs;
191: private TopDocs docs;
192: private int i;
193: private PriorityQueue hq;
194: private int[] starts;
195: private IOException ioe;
196: private Sort sort;
197:
198: public MultiSearcherThread(Searchable searchable, Weight weight,
199: Filter filter, int nDocs, HitQueue hq, int i, int[] starts,
200: String name) {
201: super (name);
202: this .searchable = searchable;
203: this .weight = weight;
204: this .filter = filter;
205: this .nDocs = nDocs;
206: this .hq = hq;
207: this .i = i;
208: this .starts = starts;
209: }
210:
211: public MultiSearcherThread(Searchable searchable, Weight weight,
212: Filter filter, int nDocs, FieldDocSortedHitQueue hq,
213: Sort sort, int i, int[] starts, String name) {
214: super (name);
215: this .searchable = searchable;
216: this .weight = weight;
217: this .filter = filter;
218: this .nDocs = nDocs;
219: this .hq = hq;
220: this .i = i;
221: this .starts = starts;
222: this .sort = sort;
223: }
224:
225: public void run() {
226: try {
227: docs = (sort == null) ? searchable.search(weight, filter,
228: nDocs) : searchable.search(weight, filter, nDocs,
229: sort);
230: }
231: // Store the IOException for later use by the caller of this thread
232: catch (IOException ioe) {
233: this .ioe = ioe;
234: }
235: if (ioe == null) {
236: // if we are sorting by fields, we need to tell the field sorted hit queue
237: // the actual type of fields, in case the original list contained AUTO.
238: // if the searchable returns null for fields, we'll have problems.
239: if (sort != null) {
240: ((FieldDocSortedHitQueue) hq)
241: .setFields(((TopFieldDocs) docs).fields);
242: }
243: ScoreDoc[] scoreDocs = docs.scoreDocs;
244: for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq
245: ScoreDoc scoreDoc = scoreDocs[j];
246: scoreDoc.doc += starts[i]; // convert doc
247: //it would be so nice if we had a thread-safe insert
248: synchronized (hq) {
249: if (!hq.insert(scoreDoc))
250: break;
251: } // no more scores > minScore
252: }
253: }
254: }
255:
256: public int hits() {
257: return docs.totalHits;
258: }
259:
260: public float getMaxScore() {
261: return docs.getMaxScore();
262: }
263:
264: public IOException getIOException() {
265: return ioe;
266: }
267:
268: }
|