001: package org.apache.lucene.index;
002:
003: /**
004: * Licensed to the Apache Software Foundation (ASF) under one or more
005: * contributor license agreements. See the NOTICE file distributed with
006: * this work for additional information regarding copyright ownership.
007: * The ASF licenses this file to You under the Apache License, Version 2.0
008: * (the "License"); you may not use this file except in compliance with
009: * the License. You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: */
019:
020: import java.util.Vector;
021: import java.util.Iterator;
022: import java.util.Collection;
023: import java.io.IOException;
024:
025: import org.apache.lucene.document.FieldSelector;
026: import org.apache.lucene.document.FieldSelectorResult;
027: import org.apache.lucene.store.Directory;
028: import org.apache.lucene.store.IndexOutput;
029: import org.apache.lucene.store.IndexInput;
030:
031: /**
032: * The SegmentMerger class combines two or more Segments, represented by an IndexReader ({@link #add},
033: * into a single Segment. After adding the appropriate readers, call the merge method to combine the
034: * segments.
035: *<P>
036: * If the compoundFile flag is set, then the segments will be merged into a compound file.
037: *
038: *
039: * @see #merge
040: * @see #add
041: */
042: final class SegmentMerger {
043:
044: /** norms header placeholder */
045: static final byte[] NORMS_HEADER = new byte[] { 'N', 'R', 'M', -1 };
046:
047: private Directory directory;
048: private String segment;
049: private int termIndexInterval = IndexWriter.DEFAULT_TERM_INDEX_INTERVAL;
050:
051: private Vector readers = new Vector();
052: private FieldInfos fieldInfos;
053:
054: private int mergedDocs;
055:
056: private CheckAbort checkAbort;
057:
058: // Whether we should merge doc stores (stored fields and
059: // vectors files). When all segments we are merging
060: // already share the same doc store files, we don't need
061: // to merge the doc stores.
062: private boolean mergeDocStores;
063:
064: /** Maximum number of contiguous documents to bulk-copy
065: when merging stored fields */
066: private final static int MAX_RAW_MERGE_DOCS = 4192;
067:
068: /** This ctor used only by test code.
069: *
070: * @param dir The Directory to merge the other segments into
071: * @param name The name of the new segment
072: */
073: SegmentMerger(Directory dir, String name) {
074: directory = dir;
075: segment = name;
076: }
077:
078: SegmentMerger(IndexWriter writer, String name,
079: MergePolicy.OneMerge merge) {
080: directory = writer.getDirectory();
081: segment = name;
082: if (merge != null)
083: checkAbort = new CheckAbort(merge, directory);
084: termIndexInterval = writer.getTermIndexInterval();
085: }
086:
087: /**
088: * Add an IndexReader to the collection of readers that are to be merged
089: * @param reader
090: */
091: final void add(IndexReader reader) {
092: readers.addElement(reader);
093: }
094:
095: /**
096: *
097: * @param i The index of the reader to return
098: * @return The ith reader to be merged
099: */
100: final IndexReader segmentReader(int i) {
101: return (IndexReader) readers.elementAt(i);
102: }
103:
104: /**
105: * Merges the readers specified by the {@link #add} method into the directory passed to the constructor
106: * @return The number of documents that were merged
107: * @throws CorruptIndexException if the index is corrupt
108: * @throws IOException if there is a low-level IO error
109: */
110: final int merge() throws CorruptIndexException, IOException {
111: return merge(true);
112: }
113:
114: /**
115: * Merges the readers specified by the {@link #add} method
116: * into the directory passed to the constructor.
117: * @param mergeDocStores if false, we will not merge the
118: * stored fields nor vectors files
119: * @return The number of documents that were merged
120: * @throws CorruptIndexException if the index is corrupt
121: * @throws IOException if there is a low-level IO error
122: */
123: final int merge(boolean mergeDocStores)
124: throws CorruptIndexException, IOException {
125:
126: this .mergeDocStores = mergeDocStores;
127:
128: // NOTE: it's important to add calls to
129: // checkAbort.work(...) if you make any changes to this
130: // method that will spend alot of time. The frequency
131: // of this check impacts how long
132: // IndexWriter.close(false) takes to actually stop the
133: // threads.
134:
135: mergedDocs = mergeFields();
136: mergeTerms();
137: mergeNorms();
138:
139: if (mergeDocStores && fieldInfos.hasVectors())
140: mergeVectors();
141:
142: return mergedDocs;
143: }
144:
145: /**
146: * close all IndexReaders that have been added.
147: * Should not be called before merge().
148: * @throws IOException
149: */
150: final void closeReaders() throws IOException {
151: for (int i = 0; i < readers.size(); i++) { // close readers
152: IndexReader reader = (IndexReader) readers.elementAt(i);
153: reader.close();
154: }
155: }
156:
157: final Vector createCompoundFile(String fileName) throws IOException {
158: CompoundFileWriter cfsWriter = new CompoundFileWriter(
159: directory, fileName, checkAbort);
160:
161: Vector files = new Vector(
162: IndexFileNames.COMPOUND_EXTENSIONS.length + 1);
163:
164: // Basic files
165: for (int i = 0; i < IndexFileNames.COMPOUND_EXTENSIONS.length; i++) {
166: String ext = IndexFileNames.COMPOUND_EXTENSIONS[i];
167: if (mergeDocStores
168: || (!ext.equals(IndexFileNames.FIELDS_EXTENSION) && !ext
169: .equals(IndexFileNames.FIELDS_INDEX_EXTENSION)))
170: files.add(segment + "." + ext);
171: }
172:
173: // Fieldable norm files
174: for (int i = 0; i < fieldInfos.size(); i++) {
175: FieldInfo fi = fieldInfos.fieldInfo(i);
176: if (fi.isIndexed && !fi.omitNorms) {
177: files.add(segment + "."
178: + IndexFileNames.NORMS_EXTENSION);
179: break;
180: }
181: }
182:
183: // Vector files
184: if (fieldInfos.hasVectors() && mergeDocStores) {
185: for (int i = 0; i < IndexFileNames.VECTOR_EXTENSIONS.length; i++) {
186: files.add(segment + "."
187: + IndexFileNames.VECTOR_EXTENSIONS[i]);
188: }
189: }
190:
191: // Now merge all added files
192: Iterator it = files.iterator();
193: while (it.hasNext()) {
194: cfsWriter.addFile((String) it.next());
195: }
196:
197: // Perform the merge
198: cfsWriter.close();
199:
200: return files;
201: }
202:
203: private void addIndexed(IndexReader reader, FieldInfos fieldInfos,
204: Collection names, boolean storeTermVectors,
205: boolean storePositionWithTermVector,
206: boolean storeOffsetWithTermVector, boolean storePayloads)
207: throws IOException {
208: Iterator i = names.iterator();
209: while (i.hasNext()) {
210: String field = (String) i.next();
211: fieldInfos.add(field, true, storeTermVectors,
212: storePositionWithTermVector,
213: storeOffsetWithTermVector, !reader.hasNorms(field),
214: storePayloads);
215: }
216: }
217:
218: /**
219: *
220: * @return The number of documents in all of the readers
221: * @throws CorruptIndexException if the index is corrupt
222: * @throws IOException if there is a low-level IO error
223: */
224: private final int mergeFields() throws CorruptIndexException,
225: IOException {
226:
227: if (!mergeDocStores) {
228: // When we are not merging by doc stores, that means
229: // all segments were written as part of a single
230: // autoCommit=false IndexWriter session, so their field
231: // name -> number mapping are the same. So, we start
232: // with the fieldInfos of the last segment in this
233: // case, to keep that numbering.
234: final SegmentReader sr = (SegmentReader) readers
235: .elementAt(readers.size() - 1);
236: fieldInfos = (FieldInfos) sr.fieldInfos.clone();
237: } else {
238: fieldInfos = new FieldInfos(); // merge field names
239: }
240:
241: for (int i = 0; i < readers.size(); i++) {
242: IndexReader reader = (IndexReader) readers.elementAt(i);
243: if (reader instanceof SegmentReader) {
244: SegmentReader segmentReader = (SegmentReader) reader;
245: for (int j = 0; j < segmentReader.getFieldInfos()
246: .size(); j++) {
247: FieldInfo fi = segmentReader.getFieldInfos()
248: .fieldInfo(j);
249: fieldInfos.add(fi.name, fi.isIndexed,
250: fi.storeTermVector,
251: fi.storePositionWithTermVector,
252: fi.storeOffsetWithTermVector, !reader
253: .hasNorms(fi.name),
254: fi.storePayloads);
255: }
256: } else {
257: addIndexed(
258: reader,
259: fieldInfos,
260: reader
261: .getFieldNames(IndexReader.FieldOption.TERMVECTOR_WITH_POSITION_OFFSET),
262: true, true, true, false);
263: addIndexed(
264: reader,
265: fieldInfos,
266: reader
267: .getFieldNames(IndexReader.FieldOption.TERMVECTOR_WITH_POSITION),
268: true, true, false, false);
269: addIndexed(
270: reader,
271: fieldInfos,
272: reader
273: .getFieldNames(IndexReader.FieldOption.TERMVECTOR_WITH_OFFSET),
274: true, false, true, false);
275: addIndexed(
276: reader,
277: fieldInfos,
278: reader
279: .getFieldNames(IndexReader.FieldOption.TERMVECTOR),
280: true, false, false, false);
281: addIndexed(
282: reader,
283: fieldInfos,
284: reader
285: .getFieldNames(IndexReader.FieldOption.STORES_PAYLOADS),
286: false, false, false, true);
287: addIndexed(
288: reader,
289: fieldInfos,
290: reader
291: .getFieldNames(IndexReader.FieldOption.INDEXED),
292: false, false, false, false);
293: fieldInfos
294: .add(
295: reader
296: .getFieldNames(IndexReader.FieldOption.UNINDEXED),
297: false);
298: }
299: }
300: fieldInfos.write(directory, segment + ".fnm");
301:
302: int docCount = 0;
303:
304: if (mergeDocStores) {
305:
306: // If the i'th reader is a SegmentReader and has
307: // identical fieldName -> number mapping, then this
308: // array will be non-null at position i:
309: SegmentReader[] matchingSegmentReaders = new SegmentReader[readers
310: .size()];
311:
312: // If this reader is a SegmentReader, and all of its
313: // field name -> number mappings match the "merged"
314: // FieldInfos, then we can do a bulk copy of the
315: // stored fields:
316: for (int i = 0; i < readers.size(); i++) {
317: IndexReader reader = (IndexReader) readers.elementAt(i);
318: if (reader instanceof SegmentReader) {
319: SegmentReader segmentReader = (SegmentReader) reader;
320: boolean same = true;
321: FieldInfos segmentFieldInfos = segmentReader
322: .getFieldInfos();
323: for (int j = 0; same
324: && j < segmentFieldInfos.size(); j++)
325: same = fieldInfos.fieldName(j).equals(
326: segmentFieldInfos.fieldName(j));
327: if (same) {
328: matchingSegmentReaders[i] = segmentReader;
329: }
330: }
331: }
332:
333: // Used for bulk-reading raw bytes for stored fields
334: final int[] rawDocLengths = new int[MAX_RAW_MERGE_DOCS];
335:
336: // for merging we don't want to compress/uncompress the data, so to tell the FieldsReader that we're
337: // in merge mode, we use this FieldSelector
338: FieldSelector fieldSelectorMerge = new FieldSelector() {
339: public FieldSelectorResult accept(String fieldName) {
340: return FieldSelectorResult.LOAD_FOR_MERGE;
341: }
342: };
343:
344: // merge field values
345: final FieldsWriter fieldsWriter = new FieldsWriter(
346: directory, segment, fieldInfos);
347:
348: try {
349: for (int i = 0; i < readers.size(); i++) {
350: final IndexReader reader = (IndexReader) readers
351: .elementAt(i);
352: final SegmentReader matchingSegmentReader = matchingSegmentReaders[i];
353: final FieldsReader matchingFieldsReader;
354: if (matchingSegmentReader != null)
355: matchingFieldsReader = matchingSegmentReader
356: .getFieldsReader();
357: else
358: matchingFieldsReader = null;
359: final int maxDoc = reader.maxDoc();
360: for (int j = 0; j < maxDoc;) {
361: if (!reader.isDeleted(j)) { // skip deleted docs
362: if (matchingSegmentReader != null) {
363: // We can optimize this case (doing a bulk
364: // byte copy) since the field numbers are
365: // identical
366: int start = j;
367: int numDocs = 0;
368: do {
369: j++;
370: numDocs++;
371: } while (j < maxDoc
372: && !matchingSegmentReader
373: .isDeleted(j)
374: && numDocs < MAX_RAW_MERGE_DOCS);
375:
376: IndexInput stream = matchingFieldsReader
377: .rawDocs(rawDocLengths, start,
378: numDocs);
379: fieldsWriter.addRawDocuments(stream,
380: rawDocLengths, numDocs);
381: docCount += numDocs;
382: if (checkAbort != null)
383: checkAbort.work(300 * numDocs);
384: } else {
385: fieldsWriter
386: .addDocument(reader.document(j,
387: fieldSelectorMerge));
388: j++;
389: docCount++;
390: if (checkAbort != null)
391: checkAbort.work(300);
392: }
393: } else
394: j++;
395: }
396: }
397: } finally {
398: fieldsWriter.close();
399: }
400:
401: } else
402: // If we are skipping the doc stores, that means there
403: // are no deletions in any of these segments, so we
404: // just sum numDocs() of each segment to get total docCount
405: for (int i = 0; i < readers.size(); i++)
406: docCount += ((IndexReader) readers.elementAt(i))
407: .numDocs();
408:
409: return docCount;
410: }
411:
412: /**
413: * Merge the TermVectors from each of the segments into the new one.
414: * @throws IOException
415: */
416: private final void mergeVectors() throws IOException {
417: TermVectorsWriter termVectorsWriter = new TermVectorsWriter(
418: directory, segment, fieldInfos);
419:
420: try {
421: for (int r = 0; r < readers.size(); r++) {
422: IndexReader reader = (IndexReader) readers.elementAt(r);
423: int maxDoc = reader.maxDoc();
424: for (int docNum = 0; docNum < maxDoc; docNum++) {
425: // skip deleted docs
426: if (reader.isDeleted(docNum))
427: continue;
428: termVectorsWriter.addAllDocVectors(reader
429: .getTermFreqVectors(docNum));
430: if (checkAbort != null)
431: checkAbort.work(300);
432: }
433: }
434: } finally {
435: termVectorsWriter.close();
436: }
437: }
438:
439: private IndexOutput freqOutput = null;
440: private IndexOutput proxOutput = null;
441: private TermInfosWriter termInfosWriter = null;
442: private int skipInterval;
443: private int maxSkipLevels;
444: private SegmentMergeQueue queue = null;
445: private DefaultSkipListWriter skipListWriter = null;
446:
447: private final void mergeTerms() throws CorruptIndexException,
448: IOException {
449: try {
450: freqOutput = directory.createOutput(segment + ".frq");
451: proxOutput = directory.createOutput(segment + ".prx");
452: termInfosWriter = new TermInfosWriter(directory, segment,
453: fieldInfos, termIndexInterval);
454: skipInterval = termInfosWriter.skipInterval;
455: maxSkipLevels = termInfosWriter.maxSkipLevels;
456: skipListWriter = new DefaultSkipListWriter(skipInterval,
457: maxSkipLevels, mergedDocs, freqOutput, proxOutput);
458: queue = new SegmentMergeQueue(readers.size());
459:
460: mergeTermInfos();
461:
462: } finally {
463: if (freqOutput != null)
464: freqOutput.close();
465: if (proxOutput != null)
466: proxOutput.close();
467: if (termInfosWriter != null)
468: termInfosWriter.close();
469: if (queue != null)
470: queue.close();
471: }
472: }
473:
474: private final void mergeTermInfos() throws CorruptIndexException,
475: IOException {
476: int base = 0;
477: for (int i = 0; i < readers.size(); i++) {
478: IndexReader reader = (IndexReader) readers.elementAt(i);
479: TermEnum termEnum = reader.terms();
480: SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum,
481: reader);
482: base += reader.numDocs();
483: if (smi.next())
484: queue.put(smi); // initialize queue
485: else
486: smi.close();
487: }
488:
489: SegmentMergeInfo[] match = new SegmentMergeInfo[readers.size()];
490:
491: while (queue.size() > 0) {
492: int matchSize = 0; // pop matching terms
493: match[matchSize++] = (SegmentMergeInfo) queue.pop();
494: Term term = match[0].term;
495: SegmentMergeInfo top = (SegmentMergeInfo) queue.top();
496:
497: while (top != null && term.compareTo(top.term) == 0) {
498: match[matchSize++] = (SegmentMergeInfo) queue.pop();
499: top = (SegmentMergeInfo) queue.top();
500: }
501:
502: final int df = mergeTermInfo(match, matchSize); // add new TermInfo
503:
504: if (checkAbort != null)
505: checkAbort.work(df / 3.0);
506:
507: while (matchSize > 0) {
508: SegmentMergeInfo smi = match[--matchSize];
509: if (smi.next())
510: queue.put(smi); // restore queue
511: else
512: smi.close(); // done with a segment
513: }
514: }
515: }
516:
517: private final TermInfo termInfo = new TermInfo(); // minimize consing
518:
519: /** Merge one term found in one or more segments. The array <code>smis</code>
520: * contains segments that are positioned at the same term. <code>N</code>
521: * is the number of cells in the array actually occupied.
522: *
523: * @param smis array of segments
524: * @param n number of cells in the array actually occupied
525: * @throws CorruptIndexException if the index is corrupt
526: * @throws IOException if there is a low-level IO error
527: */
528: private final int mergeTermInfo(SegmentMergeInfo[] smis, int n)
529: throws CorruptIndexException, IOException {
530: long freqPointer = freqOutput.getFilePointer();
531: long proxPointer = proxOutput.getFilePointer();
532:
533: int df = appendPostings(smis, n); // append posting data
534:
535: long skipPointer = skipListWriter.writeSkip(freqOutput);
536:
537: if (df > 0) {
538: // add an entry to the dictionary with pointers to prox and freq files
539: termInfo.set(df, freqPointer, proxPointer,
540: (int) (skipPointer - freqPointer));
541: termInfosWriter.add(smis[0].term, termInfo);
542: }
543:
544: return df;
545: }
546:
547: private byte[] payloadBuffer = null;
548:
549: /** Process postings from multiple segments all positioned on the
550: * same term. Writes out merged entries into freqOutput and
551: * the proxOutput streams.
552: *
553: * @param smis array of segments
554: * @param n number of cells in the array actually occupied
555: * @return number of documents across all segments where this term was found
556: * @throws CorruptIndexException if the index is corrupt
557: * @throws IOException if there is a low-level IO error
558: */
559: private final int appendPostings(SegmentMergeInfo[] smis, int n)
560: throws CorruptIndexException, IOException {
561: int lastDoc = 0;
562: int df = 0; // number of docs w/ term
563: skipListWriter.resetSkip();
564: boolean storePayloads = fieldInfos
565: .fieldInfo(smis[0].term.field).storePayloads;
566: int lastPayloadLength = -1; // ensures that we write the first length
567: for (int i = 0; i < n; i++) {
568: SegmentMergeInfo smi = smis[i];
569: TermPositions postings = smi.getPositions();
570: assert postings != null;
571: int base = smi.base;
572: int[] docMap = smi.getDocMap();
573: postings.seek(smi.termEnum);
574: while (postings.next()) {
575: int doc = postings.doc();
576: if (docMap != null)
577: doc = docMap[doc]; // map around deletions
578: doc += base; // convert to merged space
579:
580: if (doc < 0 || (df > 0 && doc <= lastDoc))
581: throw new CorruptIndexException(
582: "docs out of order (" + doc + " <= "
583: + lastDoc + " )");
584:
585: df++;
586:
587: if ((df % skipInterval) == 0) {
588: skipListWriter.setSkipData(lastDoc, storePayloads,
589: lastPayloadLength);
590: skipListWriter.bufferSkip(df);
591: }
592:
593: int docCode = (doc - lastDoc) << 1; // use low bit to flag freq=1
594: lastDoc = doc;
595:
596: int freq = postings.freq();
597: if (freq == 1) {
598: freqOutput.writeVInt(docCode | 1); // write doc & freq=1
599: } else {
600: freqOutput.writeVInt(docCode); // write doc
601: freqOutput.writeVInt(freq); // write frequency in doc
602: }
603:
604: /** See {@link DocumentWriter#writePostings(Posting[], String) for
605: * documentation about the encoding of positions and payloads
606: */
607: int lastPosition = 0; // write position deltas
608: for (int j = 0; j < freq; j++) {
609: int position = postings.nextPosition();
610: int delta = position - lastPosition;
611: if (storePayloads) {
612: int payloadLength = postings.getPayloadLength();
613: if (payloadLength == lastPayloadLength) {
614: proxOutput.writeVInt(delta * 2);
615: } else {
616: proxOutput.writeVInt(delta * 2 + 1);
617: proxOutput.writeVInt(payloadLength);
618: lastPayloadLength = payloadLength;
619: }
620: if (payloadLength > 0) {
621: if (payloadBuffer == null
622: || payloadBuffer.length < payloadLength) {
623: payloadBuffer = new byte[payloadLength];
624: }
625: postings.getPayload(payloadBuffer, 0);
626: proxOutput.writeBytes(payloadBuffer, 0,
627: payloadLength);
628: }
629: } else {
630: proxOutput.writeVInt(delta);
631: }
632: lastPosition = position;
633: }
634: }
635: }
636: return df;
637: }
638:
639: private void mergeNorms() throws IOException {
640: byte[] normBuffer = null;
641: IndexOutput output = null;
642: try {
643: for (int i = 0; i < fieldInfos.size(); i++) {
644: FieldInfo fi = fieldInfos.fieldInfo(i);
645: if (fi.isIndexed && !fi.omitNorms) {
646: if (output == null) {
647: output = directory.createOutput(segment + "."
648: + IndexFileNames.NORMS_EXTENSION);
649: output.writeBytes(NORMS_HEADER,
650: NORMS_HEADER.length);
651: }
652: for (int j = 0; j < readers.size(); j++) {
653: IndexReader reader = (IndexReader) readers
654: .elementAt(j);
655: int maxDoc = reader.maxDoc();
656: if (normBuffer == null
657: || normBuffer.length < maxDoc) {
658: // the buffer is too small for the current segment
659: normBuffer = new byte[maxDoc];
660: }
661: reader.norms(fi.name, normBuffer, 0);
662: if (!reader.hasDeletions()) {
663: //optimized case for segments without deleted docs
664: output.writeBytes(normBuffer, maxDoc);
665: } else {
666: // this segment has deleted docs, so we have to
667: // check for every doc if it is deleted or not
668: for (int k = 0; k < maxDoc; k++) {
669: if (!reader.isDeleted(k)) {
670: output.writeByte(normBuffer[k]);
671: }
672: }
673: }
674: if (checkAbort != null)
675: checkAbort.work(maxDoc);
676: }
677: }
678: }
679: } finally {
680: if (output != null) {
681: output.close();
682: }
683: }
684: }
685:
686: final static class CheckAbort {
687: private double workCount;
688: private MergePolicy.OneMerge merge;
689: private Directory dir;
690:
691: public CheckAbort(MergePolicy.OneMerge merge, Directory dir) {
692: this .merge = merge;
693: this .dir = dir;
694: }
695:
696: /**
697: * Records the fact that roughly units amount of work
698: * have been done since this method was last called.
699: * When adding time-consuming code into SegmentMerger,
700: * you should test different values for units to ensure
701: * that the time in between calls to merge.checkAborted
702: * is up to ~ 1 second.
703: */
704: public void work(double units)
705: throws MergePolicy.MergeAbortedException {
706: workCount += units;
707: if (workCount >= 10000.0) {
708: merge.checkAborted(dir);
709: workCount = 0;
710: }
711: }
712: }
713: }
|