001: package org.apache.lucene.index;
002:
003: /**
004: * Licensed to the Apache Software Foundation (ASF) under one or more
005: * contributor license agreements. See the NOTICE file distributed with
006: * this work for additional information regarding copyright ownership.
007: * The ASF licenses this file to You under the Apache License, Version 2.0
008: * (the "License"); you may not use this file except in compliance with
009: * the License. You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: */
019:
020: import org.apache.lucene.document.Document;
021: import org.apache.lucene.document.FieldSelector;
022: import org.apache.lucene.store.Directory;
023:
024: import java.io.IOException;
025: import java.util.Collection;
026: import java.util.HashMap;
027: import java.util.HashSet;
028: import java.util.Hashtable;
029: import java.util.Iterator;
030: import java.util.Map;
031: import java.util.Set;
032:
033: /**
034: * An IndexReader which reads indexes with multiple segments.
035: */
036: class MultiSegmentReader extends DirectoryIndexReader {
037: protected SegmentReader[] subReaders;
038: private int[] starts; // 1st docno for each segment
039: private Hashtable normsCache = new Hashtable();
040: private int maxDoc = 0;
041: private int numDocs = -1;
042: private boolean hasDeletions = false;
043:
044: /** Construct reading the named set of readers. */
045: MultiSegmentReader(Directory directory, SegmentInfos sis,
046: boolean closeDirectory) throws IOException {
047: super (directory, sis, closeDirectory);
048: // To reduce the chance of hitting FileNotFound
049: // (and having to retry), we open segments in
050: // reverse because IndexWriter merges & deletes
051: // the newest segments first.
052:
053: SegmentReader[] readers = new SegmentReader[sis.size()];
054: for (int i = sis.size() - 1; i >= 0; i--) {
055: try {
056: readers[i] = SegmentReader.get(sis.info(i));
057: } catch (IOException e) {
058: // Close all readers we had opened:
059: for (i++; i < sis.size(); i++) {
060: try {
061: readers[i].close();
062: } catch (IOException ignore) {
063: // keep going - we want to clean up as much as possible
064: }
065: }
066: throw e;
067: }
068: }
069:
070: initialize(readers);
071: }
072:
073: /** This contructor is only used for {@link #reopen()} */
074: MultiSegmentReader(Directory directory, SegmentInfos infos,
075: boolean closeDirectory, SegmentReader[] oldReaders,
076: int[] oldStarts, Map oldNormsCache) throws IOException {
077: super (directory, infos, closeDirectory);
078:
079: // we put the old SegmentReaders in a map, that allows us
080: // to lookup a reader using its segment name
081: Map segmentReaders = new HashMap();
082:
083: if (oldReaders != null) {
084: // create a Map SegmentName->SegmentReader
085: for (int i = 0; i < oldReaders.length; i++) {
086: segmentReaders.put(oldReaders[i].getSegmentName(),
087: new Integer(i));
088: }
089: }
090:
091: SegmentReader[] newReaders = new SegmentReader[infos.size()];
092:
093: // remember which readers are shared between the old and the re-opened
094: // MultiSegmentReader - we have to incRef those readers
095: boolean[] readerShared = new boolean[infos.size()];
096:
097: for (int i = infos.size() - 1; i >= 0; i--) {
098: // find SegmentReader for this segment
099: Integer oldReaderIndex = (Integer) segmentReaders.get(infos
100: .info(i).name);
101: if (oldReaderIndex == null) {
102: // this is a new segment, no old SegmentReader can be reused
103: newReaders[i] = null;
104: } else {
105: // there is an old reader for this segment - we'll try to reopen it
106: newReaders[i] = oldReaders[oldReaderIndex.intValue()];
107: }
108:
109: boolean success = false;
110: try {
111: SegmentReader newReader;
112: if (newReaders[i] == null
113: || infos.info(i).getUseCompoundFile() != newReaders[i]
114: .getSegmentInfo().getUseCompoundFile()) {
115: // this is a new reader; in case we hit an exception we can close it safely
116: newReader = SegmentReader.get(infos.info(i));
117: } else {
118: newReader = (SegmentReader) newReaders[i]
119: .reopenSegment(infos.info(i));
120: }
121: if (newReader == newReaders[i]) {
122: // this reader will be shared between the old and the new one,
123: // so we must incRef it
124: readerShared[i] = true;
125: newReader.incRef();
126: } else {
127: readerShared[i] = false;
128: newReaders[i] = newReader;
129: }
130: success = true;
131: } finally {
132: if (!success) {
133: for (i++; i < infos.size(); i++) {
134: if (newReaders[i] != null) {
135: try {
136: if (!readerShared[i]) {
137: // this is a new subReader that is not used by the old one,
138: // we can close it
139: newReaders[i].close();
140: } else {
141: // this subReader is also used by the old reader, so instead
142: // closing we must decRef it
143: newReaders[i].decRef();
144: }
145: } catch (IOException ignore) {
146: // keep going - we want to clean up as much as possible
147: }
148: }
149: }
150: }
151: }
152: }
153:
154: // initialize the readers to calculate maxDoc before we try to reuse the old normsCache
155: initialize(newReaders);
156:
157: // try to copy unchanged norms from the old normsCache to the new one
158: if (oldNormsCache != null) {
159: Iterator it = oldNormsCache.keySet().iterator();
160: while (it.hasNext()) {
161: String field = (String) it.next();
162: if (!hasNorms(field)) {
163: continue;
164: }
165:
166: byte[] oldBytes = (byte[]) oldNormsCache.get(field);
167:
168: byte[] bytes = new byte[maxDoc()];
169:
170: for (int i = 0; i < subReaders.length; i++) {
171: Integer oldReaderIndex = ((Integer) segmentReaders
172: .get(subReaders[i].getSegmentName()));
173:
174: // this SegmentReader was not re-opened, we can copy all of its norms
175: if (oldReaderIndex != null
176: && (oldReaders[oldReaderIndex.intValue()] == subReaders[i] || oldReaders[oldReaderIndex
177: .intValue()].norms.get(field) == subReaders[i].norms
178: .get(field))) {
179: // we don't have to synchronize here: either this constructor is called from a SegmentReader,
180: // in which case no old norms cache is present, or it is called from MultiReader.reopen(),
181: // which is synchronized
182: System.arraycopy(oldBytes,
183: oldStarts[oldReaderIndex.intValue()],
184: bytes, starts[i], starts[i + 1]
185: - starts[i]);
186: } else {
187: subReaders[i].norms(field, bytes, starts[i]);
188: }
189: }
190:
191: normsCache.put(field, bytes); // update cache
192: }
193: }
194: }
195:
196: private void initialize(SegmentReader[] subReaders) {
197: this .subReaders = subReaders;
198: starts = new int[subReaders.length + 1]; // build starts array
199: for (int i = 0; i < subReaders.length; i++) {
200: starts[i] = maxDoc;
201: maxDoc += subReaders[i].maxDoc(); // compute maxDocs
202:
203: if (subReaders[i].hasDeletions())
204: hasDeletions = true;
205: }
206: starts[subReaders.length] = maxDoc;
207: }
208:
209: protected synchronized DirectoryIndexReader doReopen(
210: SegmentInfos infos) throws CorruptIndexException,
211: IOException {
212: if (infos.size() == 1) {
213: // The index has only one segment now, so we can't refresh the MultiSegmentReader.
214: // Return a new SegmentReader instead
215: SegmentReader newReader = SegmentReader.get(infos, infos
216: .info(0), false);
217: return newReader;
218: } else {
219: return new MultiSegmentReader(directory, infos,
220: closeDirectory, subReaders, starts, normsCache);
221: }
222: }
223:
224: public TermFreqVector[] getTermFreqVectors(int n)
225: throws IOException {
226: ensureOpen();
227: int i = readerIndex(n); // find segment num
228: return subReaders[i].getTermFreqVectors(n - starts[i]); // dispatch to segment
229: }
230:
231: public TermFreqVector getTermFreqVector(int n, String field)
232: throws IOException {
233: ensureOpen();
234: int i = readerIndex(n); // find segment num
235: return subReaders[i].getTermFreqVector(n - starts[i], field);
236: }
237:
238: public void getTermFreqVector(int docNumber, String field,
239: TermVectorMapper mapper) throws IOException {
240: ensureOpen();
241: int i = readerIndex(docNumber); // find segment num
242: subReaders[i].getTermFreqVector(docNumber - starts[i], field,
243: mapper);
244: }
245:
246: public void getTermFreqVector(int docNumber, TermVectorMapper mapper)
247: throws IOException {
248: ensureOpen();
249: int i = readerIndex(docNumber); // find segment num
250: subReaders[i].getTermFreqVector(docNumber - starts[i], mapper);
251: }
252:
253: public boolean isOptimized() {
254: return false;
255: }
256:
257: public synchronized int numDocs() {
258: // Don't call ensureOpen() here (it could affect performance)
259: if (numDocs == -1) { // check cache
260: int n = 0; // cache miss--recompute
261: for (int i = 0; i < subReaders.length; i++)
262: n += subReaders[i].numDocs(); // sum from readers
263: numDocs = n;
264: }
265: return numDocs;
266: }
267:
268: public int maxDoc() {
269: // Don't call ensureOpen() here (it could affect performance)
270: return maxDoc;
271: }
272:
273: // inherit javadoc
274: public Document document(int n, FieldSelector fieldSelector)
275: throws CorruptIndexException, IOException {
276: ensureOpen();
277: int i = readerIndex(n); // find segment num
278: return subReaders[i].document(n - starts[i], fieldSelector); // dispatch to segment reader
279: }
280:
281: public boolean isDeleted(int n) {
282: // Don't call ensureOpen() here (it could affect performance)
283: int i = readerIndex(n); // find segment num
284: return subReaders[i].isDeleted(n - starts[i]); // dispatch to segment reader
285: }
286:
287: public boolean hasDeletions() {
288: // Don't call ensureOpen() here (it could affect performance)
289: return hasDeletions;
290: }
291:
292: protected void doDelete(int n) throws CorruptIndexException,
293: IOException {
294: numDocs = -1; // invalidate cache
295: int i = readerIndex(n); // find segment num
296: subReaders[i].deleteDocument(n - starts[i]); // dispatch to segment reader
297: hasDeletions = true;
298: }
299:
300: protected void doUndeleteAll() throws CorruptIndexException,
301: IOException {
302: for (int i = 0; i < subReaders.length; i++)
303: subReaders[i].undeleteAll();
304:
305: hasDeletions = false;
306: numDocs = -1; // invalidate cache
307: }
308:
309: private int readerIndex(int n) { // find reader for doc n:
310: return readerIndex(n, this .starts, this .subReaders.length);
311: }
312:
313: static int readerIndex(int n, int[] starts, int numSubReaders) { // find reader for doc n:
314: int lo = 0; // search starts array
315: int hi = numSubReaders - 1; // for first element less
316:
317: while (hi >= lo) {
318: int mid = (lo + hi) >> 1;
319: int midValue = starts[mid];
320: if (n < midValue)
321: hi = mid - 1;
322: else if (n > midValue)
323: lo = mid + 1;
324: else { // found a match
325: while (mid + 1 < numSubReaders
326: && starts[mid + 1] == midValue) {
327: mid++; // scan to last match
328: }
329: return mid;
330: }
331: }
332: return hi;
333: }
334:
335: public boolean hasNorms(String field) throws IOException {
336: ensureOpen();
337: for (int i = 0; i < subReaders.length; i++) {
338: if (subReaders[i].hasNorms(field))
339: return true;
340: }
341: return false;
342: }
343:
344: private byte[] ones;
345:
346: private byte[] fakeNorms() {
347: if (ones == null)
348: ones = SegmentReader.createFakeNorms(maxDoc());
349: return ones;
350: }
351:
352: public synchronized byte[] norms(String field) throws IOException {
353: ensureOpen();
354: byte[] bytes = (byte[]) normsCache.get(field);
355: if (bytes != null)
356: return bytes; // cache hit
357: if (!hasNorms(field))
358: return fakeNorms();
359:
360: bytes = new byte[maxDoc()];
361: for (int i = 0; i < subReaders.length; i++)
362: subReaders[i].norms(field, bytes, starts[i]);
363: normsCache.put(field, bytes); // update cache
364: return bytes;
365: }
366:
367: public synchronized void norms(String field, byte[] result,
368: int offset) throws IOException {
369: ensureOpen();
370: byte[] bytes = (byte[]) normsCache.get(field);
371: if (bytes == null && !hasNorms(field))
372: bytes = fakeNorms();
373: if (bytes != null) // cache hit
374: System.arraycopy(bytes, 0, result, offset, maxDoc());
375:
376: for (int i = 0; i < subReaders.length; i++)
377: // read from segments
378: subReaders[i].norms(field, result, offset + starts[i]);
379: }
380:
381: protected void doSetNorm(int n, String field, byte value)
382: throws CorruptIndexException, IOException {
383: normsCache.remove(field); // clear cache
384: int i = readerIndex(n); // find segment num
385: subReaders[i].setNorm(n - starts[i], field, value); // dispatch
386: }
387:
388: public TermEnum terms() throws IOException {
389: ensureOpen();
390: return new MultiTermEnum(subReaders, starts, null);
391: }
392:
393: public TermEnum terms(Term term) throws IOException {
394: ensureOpen();
395: return new MultiTermEnum(subReaders, starts, term);
396: }
397:
398: public int docFreq(Term t) throws IOException {
399: ensureOpen();
400: int total = 0; // sum freqs in segments
401: for (int i = 0; i < subReaders.length; i++)
402: total += subReaders[i].docFreq(t);
403: return total;
404: }
405:
406: public TermDocs termDocs() throws IOException {
407: ensureOpen();
408: return new MultiTermDocs(subReaders, starts);
409: }
410:
411: public TermPositions termPositions() throws IOException {
412: ensureOpen();
413: return new MultiTermPositions(subReaders, starts);
414: }
415:
416: protected void commitChanges() throws IOException {
417: for (int i = 0; i < subReaders.length; i++)
418: subReaders[i].commit();
419: }
420:
421: void startCommit() {
422: super .startCommit();
423: for (int i = 0; i < subReaders.length; i++) {
424: subReaders[i].startCommit();
425: }
426: }
427:
428: void rollbackCommit() {
429: super .rollbackCommit();
430: for (int i = 0; i < subReaders.length; i++) {
431: subReaders[i].rollbackCommit();
432: }
433: }
434:
435: protected synchronized void doClose() throws IOException {
436: for (int i = 0; i < subReaders.length; i++)
437: subReaders[i].decRef();
438:
439: // maybe close directory
440: super .doClose();
441: }
442:
443: public Collection getFieldNames(IndexReader.FieldOption fieldNames) {
444: ensureOpen();
445: return getFieldNames(fieldNames, this .subReaders);
446: }
447:
448: static Collection getFieldNames(IndexReader.FieldOption fieldNames,
449: IndexReader[] subReaders) {
450: // maintain a unique set of field names
451: Set fieldSet = new HashSet();
452: for (int i = 0; i < subReaders.length; i++) {
453: IndexReader reader = subReaders[i];
454: Collection names = reader.getFieldNames(fieldNames);
455: fieldSet.addAll(names);
456: }
457: return fieldSet;
458: }
459:
460: // for testing
461: SegmentReader[] getSubReaders() {
462: return subReaders;
463: }
464:
465: public void setTermInfosIndexDivisor(int indexDivisor)
466: throws IllegalStateException {
467: for (int i = 0; i < subReaders.length; i++)
468: subReaders[i].setTermInfosIndexDivisor(indexDivisor);
469: }
470:
471: public int getTermInfosIndexDivisor() throws IllegalStateException {
472: if (subReaders.length > 0)
473: return subReaders[0].getTermInfosIndexDivisor();
474: else
475: throw new IllegalStateException("no readers");
476: }
477:
478: static class MultiTermEnum extends TermEnum {
479: private SegmentMergeQueue queue;
480:
481: private Term term;
482: private int docFreq;
483:
484: public MultiTermEnum(IndexReader[] readers, int[] starts, Term t)
485: throws IOException {
486: queue = new SegmentMergeQueue(readers.length);
487: for (int i = 0; i < readers.length; i++) {
488: IndexReader reader = readers[i];
489: TermEnum termEnum;
490:
491: if (t != null) {
492: termEnum = reader.terms(t);
493: } else
494: termEnum = reader.terms();
495:
496: SegmentMergeInfo smi = new SegmentMergeInfo(starts[i],
497: termEnum, reader);
498: if (t == null ? smi.next() : termEnum.term() != null)
499: queue.put(smi); // initialize queue
500: else
501: smi.close();
502: }
503:
504: if (t != null && queue.size() > 0) {
505: next();
506: }
507: }
508:
509: public boolean next() throws IOException {
510: SegmentMergeInfo top = (SegmentMergeInfo) queue.top();
511: if (top == null) {
512: term = null;
513: return false;
514: }
515:
516: term = top.term;
517: docFreq = 0;
518:
519: while (top != null && term.compareTo(top.term) == 0) {
520: queue.pop();
521: docFreq += top.termEnum.docFreq(); // increment freq
522: if (top.next())
523: queue.put(top); // restore queue
524: else
525: top.close(); // done with a segment
526: top = (SegmentMergeInfo) queue.top();
527: }
528: return true;
529: }
530:
531: public Term term() {
532: return term;
533: }
534:
535: public int docFreq() {
536: return docFreq;
537: }
538:
539: public void close() throws IOException {
540: queue.close();
541: }
542: }
543:
544: static class MultiTermDocs implements TermDocs {
545: protected IndexReader[] readers;
546: protected int[] starts;
547: protected Term term;
548:
549: protected int base = 0;
550: protected int pointer = 0;
551:
552: private TermDocs[] readerTermDocs;
553: protected TermDocs current; // == readerTermDocs[pointer]
554:
555: public MultiTermDocs(IndexReader[] r, int[] s) {
556: readers = r;
557: starts = s;
558:
559: readerTermDocs = new TermDocs[r.length];
560: }
561:
562: public int doc() {
563: return base + current.doc();
564: }
565:
566: public int freq() {
567: return current.freq();
568: }
569:
570: public void seek(Term term) {
571: this .term = term;
572: this .base = 0;
573: this .pointer = 0;
574: this .current = null;
575: }
576:
577: public void seek(TermEnum termEnum) throws IOException {
578: seek(termEnum.term());
579: }
580:
581: public boolean next() throws IOException {
582: for (;;) {
583: if (current != null && current.next()) {
584: return true;
585: } else if (pointer < readers.length) {
586: base = starts[pointer];
587: current = termDocs(pointer++);
588: } else {
589: return false;
590: }
591: }
592: }
593:
594: /** Optimized implementation. */
595: public int read(final int[] docs, final int[] freqs)
596: throws IOException {
597: while (true) {
598: while (current == null) {
599: if (pointer < readers.length) { // try next segment
600: base = starts[pointer];
601: current = termDocs(pointer++);
602: } else {
603: return 0;
604: }
605: }
606: int end = current.read(docs, freqs);
607: if (end == 0) { // none left in segment
608: current = null;
609: } else { // got some
610: final int b = base; // adjust doc numbers
611: for (int i = 0; i < end; i++)
612: docs[i] += b;
613: return end;
614: }
615: }
616: }
617:
618: /* A Possible future optimization could skip entire segments */
619: public boolean skipTo(int target) throws IOException {
620: for (;;) {
621: if (current != null && current.skipTo(target - base)) {
622: return true;
623: } else if (pointer < readers.length) {
624: base = starts[pointer];
625: current = termDocs(pointer++);
626: } else
627: return false;
628: }
629: }
630:
631: private TermDocs termDocs(int i) throws IOException {
632: if (term == null)
633: return null;
634: TermDocs result = readerTermDocs[i];
635: if (result == null)
636: result = readerTermDocs[i] = termDocs(readers[i]);
637: result.seek(term);
638: return result;
639: }
640:
641: protected TermDocs termDocs(IndexReader reader)
642: throws IOException {
643: return reader.termDocs();
644: }
645:
646: public void close() throws IOException {
647: for (int i = 0; i < readerTermDocs.length; i++) {
648: if (readerTermDocs[i] != null)
649: readerTermDocs[i].close();
650: }
651: }
652: }
653:
654: static class MultiTermPositions extends MultiTermDocs implements
655: TermPositions {
656: public MultiTermPositions(IndexReader[] r, int[] s) {
657: super (r, s);
658: }
659:
660: protected TermDocs termDocs(IndexReader reader)
661: throws IOException {
662: return (TermDocs) reader.termPositions();
663: }
664:
665: public int nextPosition() throws IOException {
666: return ((TermPositions) current).nextPosition();
667: }
668:
669: public int getPayloadLength() {
670: return ((TermPositions) current).getPayloadLength();
671: }
672:
673: public byte[] getPayload(byte[] data, int offset)
674: throws IOException {
675: return ((TermPositions) current).getPayload(data, offset);
676: }
677:
678: // TODO: Remove warning after API has been finalized
679: public boolean isPayloadAvailable() {
680: return ((TermPositions) current).isPayloadAvailable();
681: }
682: }
683: }
|