0001: package org.apache.lucene.index;
0002:
0003: /**
0004: * Licensed to the Apache Software Foundation (ASF) under one or more
0005: * contributor license agreements. See the NOTICE file distributed with
0006: * this work for additional information regarding copyright ownership.
0007: * The ASF licenses this file to You under the Apache License, Version 2.0
0008: * (the "License"); you may not use this file except in compliance with
0009: * the License. You may obtain a copy of the License at
0010: *
0011: * http://www.apache.org/licenses/LICENSE-2.0
0012: *
0013: * Unless required by applicable law or agreed to in writing, software
0014: * distributed under the License is distributed on an "AS IS" BASIS,
0015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0016: * See the License for the specific language governing permissions and
0017: * limitations under the License.
0018: */
0019:
0020: import org.apache.lucene.analysis.Analyzer;
0021: import org.apache.lucene.analysis.Token;
0022: import org.apache.lucene.analysis.TokenStream;
0023: import org.apache.lucene.document.Document;
0024: import org.apache.lucene.document.Fieldable;
0025: import org.apache.lucene.search.Similarity;
0026: import org.apache.lucene.store.Directory;
0027: import org.apache.lucene.store.IndexOutput;
0028: import org.apache.lucene.store.IndexInput;
0029: import org.apache.lucene.store.RAMOutputStream;
0030: import org.apache.lucene.store.AlreadyClosedException;
0031:
0032: import java.io.IOException;
0033: import java.io.PrintStream;
0034: import java.io.Reader;
0035: import java.util.Arrays;
0036: import java.util.List;
0037: import java.util.HashMap;
0038: import java.util.ArrayList;
0039: import java.text.NumberFormat;
0040: import java.util.Collections;
0041:
0042: /**
0043: * This class accepts multiple added documents and directly
0044: * writes a single segment file. It does this more
0045: * efficiently than creating a single segment per document
0046: * (with DocumentWriter) and doing standard merges on those
0047: * segments.
0048: *
0049: * When a document is added, its stored fields (if any) and
0050: * term vectors (if any) are immediately written to the
0051: * Directory (ie these do not consume RAM). The freq/prox
0052: * postings are accumulated into a Postings hash table keyed
0053: * by term. Each entry in this hash table holds a separate
0054: * byte stream (allocated as incrementally growing slices
0055: * into large shared byte[] arrays) for freq and prox, that
0056: * contains the postings data for multiple documents. If
0057: * vectors are enabled, each unique term for each document
0058: * also allocates a PostingVector instance to similarly
0059: * track the offsets & positions byte stream.
0060: *
0061: * Once the Postings hash is full (ie is consuming the
0062: * allowed RAM) or the number of added docs is large enough
0063: * (in the case we are flushing by doc count instead of RAM
0064: * usage), we create a real segment and flush it to disk and
0065: * reset the Postings hash.
0066: *
0067: * In adding a document we first organize all of its fields
0068: * by field name. We then process field by field, and
0069: * record the Posting hash per-field. After each field we
0070: * flush its term vectors. When it's time to flush the full
0071: * segment we first sort the fields by name, and then go
0072: * field by field and sorts its postings.
0073: *
0074: *
0075: * Threads:
0076: *
0077: * Multiple threads are allowed into addDocument at once.
0078: * There is an initial synchronized call to getThreadState
0079: * which allocates a ThreadState for this thread. The same
0080: * thread will get the same ThreadState over time (thread
0081: * affinity) so that if there are consistent patterns (for
0082: * example each thread is indexing a different content
0083: * source) then we make better use of RAM. Then
0084: * processDocument is called on that ThreadState without
0085: * synchronization (most of the "heavy lifting" is in this
0086: * call). Finally the synchronized "finishDocument" is
0087: * called to flush changes to the directory.
0088: *
0089: * Each ThreadState instance has its own Posting hash. Once
0090: * we're using too much RAM, we flush all Posting hashes to
0091: * a segment by merging the docIDs in the posting lists for
0092: * the same term across multiple thread states (see
0093: * writeSegment and appendPostings).
0094: *
0095: * When flush is called by IndexWriter, or, we flush
0096: * internally when autoCommit=false, we forcefully idle all
0097: * threads and flush only once they are all idle. This
0098: * means you can call flush with a given thread even while
0099: * other threads are actively adding/deleting documents.
0100: *
0101: *
0102: * Exceptions:
0103: *
0104: * Because this class directly updates in-memory posting
0105: * lists, and flushes stored fields and term vectors
0106: * directly to files in the directory, there are certain
0107: * limited times when an exception can corrupt this state.
0108: * For example, a disk full while flushing stored fields
0109: * leaves this file in a corrupt state. Or, an OOM
0110: * exception while appending to the in-memory posting lists
0111: * can corrupt that posting list. We call such exceptions
0112: * "aborting exceptions". In these cases we must call
0113: * abort() to discard all docs added since the last flush.
0114: *
0115: * All other exceptions ("non-aborting exceptions") can
0116: * still partially update the index structures. These
0117: * updates are consistent, but, they represent only a part
0118: * of the document seen up until the exception was hit.
0119: * When this happens, we immediately mark the document as
0120: * deleted so that the document is always atomically ("all
0121: * or none") added to the index.
0122: */
0123:
0124: final class DocumentsWriter {
0125:
0126: private IndexWriter writer;
0127: private Directory directory;
0128:
0129: private FieldInfos fieldInfos = new FieldInfos(); // All fields we've seen
0130: private IndexOutput tvx, tvf, tvd; // To write term vectors
0131: private FieldsWriter fieldsWriter; // To write stored fields
0132:
0133: private String segment; // Current segment we are working on
0134: private String docStoreSegment; // Current doc-store segment we are writing
0135: private int docStoreOffset; // Current starting doc-store offset of current segment
0136:
0137: private int nextDocID; // Next docID to be added
0138: private int numDocsInRAM; // # docs buffered in RAM
0139: private int numDocsInStore; // # docs written to doc stores
0140: private int nextWriteDocID; // Next docID to be written
0141:
0142: // Max # ThreadState instances; if there are more threads
0143: // than this they share ThreadStates
0144: private final static int MAX_THREAD_STATE = 5;
0145: private ThreadState[] threadStates = new ThreadState[0];
0146: private final HashMap threadBindings = new HashMap();
0147: private int numWaiting;
0148: private final ThreadState[] waitingThreadStates = new ThreadState[MAX_THREAD_STATE];
0149: private int pauseThreads; // Non-zero when we need all threads to
0150: // pause (eg to flush)
0151: private boolean flushPending; // True when a thread has decided to flush
0152: private boolean bufferIsFull; // True when it's time to write segment
0153: private int abortCount; // Non-zero while abort is pending or running
0154:
0155: private PrintStream infoStream;
0156:
0157: // This Hashmap buffers delete terms in ram before they
0158: // are applied. The key is delete term; the value is
0159: // number of buffered documents the term applies to.
0160: private HashMap bufferedDeleteTerms = new HashMap();
0161: private int numBufferedDeleteTerms = 0;
0162:
0163: // Currently used only for deleting a doc on hitting an non-aborting exception
0164: private List bufferedDeleteDocIDs = new ArrayList();
0165:
0166: // The max number of delete terms that can be buffered before
0167: // they must be flushed to disk.
0168: private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
0169:
0170: // How much RAM we can use before flushing. This is 0 if
0171: // we are flushing by doc count instead.
0172: private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024);
0173:
0174: // Flush @ this number of docs. If rarmBufferSize is
0175: // non-zero we will flush by RAM usage instead.
0176: private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS;
0177:
0178: private boolean closed;
0179:
0180: // Coarse estimates used to measure RAM usage of buffered deletes
0181: private static int OBJECT_HEADER_BYTES = 8;
0182: private static int OBJECT_POINTER_BYTES = 4; // TODO: should be 8 on 64-bit platform
0183: private static int BYTES_PER_CHAR = 2;
0184: private static int BYTES_PER_INT = 4;
0185:
0186: private BufferedNorms[] norms = new BufferedNorms[0]; // Holds norms until we flush
0187:
0188: DocumentsWriter(Directory directory, IndexWriter writer)
0189: throws IOException {
0190: this .directory = directory;
0191: this .writer = writer;
0192:
0193: postingsFreeList = new Posting[0];
0194: }
0195:
0196: /** If non-null, various details of indexing are printed
0197: * here. */
0198: void setInfoStream(PrintStream infoStream) {
0199: this .infoStream = infoStream;
0200: }
0201:
0202: /** Set how much RAM we can use before flushing. */
0203: void setRAMBufferSizeMB(double mb) {
0204: if (mb == IndexWriter.DISABLE_AUTO_FLUSH) {
0205: ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH;
0206: } else {
0207: ramBufferSize = (long) (mb * 1024 * 1024);
0208: }
0209: }
0210:
0211: double getRAMBufferSizeMB() {
0212: if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) {
0213: return ramBufferSize;
0214: } else {
0215: return ramBufferSize / 1024. / 1024.;
0216: }
0217: }
0218:
0219: /** Set max buffered docs, which means we will flush by
0220: * doc count instead of by RAM usage. */
0221: void setMaxBufferedDocs(int count) {
0222: maxBufferedDocs = count;
0223: }
0224:
0225: int getMaxBufferedDocs() {
0226: return maxBufferedDocs;
0227: }
0228:
0229: /** Get current segment name we are writing. */
0230: String getSegment() {
0231: return segment;
0232: }
0233:
0234: /** Returns how many docs are currently buffered in RAM. */
0235: int getNumDocsInRAM() {
0236: return numDocsInRAM;
0237: }
0238:
0239: /** Returns the current doc store segment we are writing
0240: * to. This will be the same as segment when autoCommit
0241: * * is true. */
0242: String getDocStoreSegment() {
0243: return docStoreSegment;
0244: }
0245:
0246: /** Returns the doc offset into the shared doc store for
0247: * the current buffered docs. */
0248: int getDocStoreOffset() {
0249: return docStoreOffset;
0250: }
0251:
0252: /** Closes the current open doc stores an returns the doc
0253: * store segment name. This returns null if there are *
0254: * no buffered documents. */
0255: String closeDocStore() throws IOException {
0256:
0257: assert allThreadsIdle();
0258:
0259: List flushedFiles = files();
0260:
0261: if (infoStream != null)
0262: infoStream.println("\ncloseDocStore: "
0263: + flushedFiles.size()
0264: + " files to flush to segment " + docStoreSegment
0265: + " numDocs=" + numDocsInStore);
0266:
0267: if (flushedFiles.size() > 0) {
0268: files = null;
0269:
0270: if (tvx != null) {
0271: // At least one doc in this run had term vectors enabled
0272: assert docStoreSegment != null;
0273: tvx.close();
0274: tvf.close();
0275: tvd.close();
0276: tvx = null;
0277: }
0278:
0279: if (fieldsWriter != null) {
0280: assert docStoreSegment != null;
0281: fieldsWriter.close();
0282: fieldsWriter = null;
0283: }
0284:
0285: String s = docStoreSegment;
0286: docStoreSegment = null;
0287: docStoreOffset = 0;
0288: numDocsInStore = 0;
0289: return s;
0290: } else {
0291: return null;
0292: }
0293: }
0294:
0295: private List files = null; // Cached list of files we've created
0296: private List abortedFiles = null; // List of files that were written before last abort()
0297:
0298: List abortedFiles() {
0299: return abortedFiles;
0300: }
0301:
0302: /* Returns list of files in use by this instance,
0303: * including any flushed segments. */
0304: synchronized List files() {
0305:
0306: if (files != null)
0307: return files;
0308:
0309: files = new ArrayList();
0310:
0311: // Stored fields:
0312: if (fieldsWriter != null) {
0313: assert docStoreSegment != null;
0314: files.add(docStoreSegment + "."
0315: + IndexFileNames.FIELDS_EXTENSION);
0316: files.add(docStoreSegment + "."
0317: + IndexFileNames.FIELDS_INDEX_EXTENSION);
0318: }
0319:
0320: // Vectors:
0321: if (tvx != null) {
0322: assert docStoreSegment != null;
0323: files.add(docStoreSegment + "."
0324: + IndexFileNames.VECTORS_INDEX_EXTENSION);
0325: files.add(docStoreSegment + "."
0326: + IndexFileNames.VECTORS_FIELDS_EXTENSION);
0327: files.add(docStoreSegment + "."
0328: + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
0329: }
0330:
0331: return files;
0332: }
0333:
0334: synchronized void setAborting() {
0335: abortCount++;
0336: }
0337:
0338: /** Called if we hit an exception when adding docs,
0339: * flushing, etc. This resets our state, discarding any
0340: * docs added since last flush. If ae is non-null, it
0341: * contains the root cause exception (which we re-throw
0342: * after we are done aborting). */
0343: synchronized void abort(AbortException ae) throws IOException {
0344:
0345: // Anywhere that throws an AbortException must first
0346: // mark aborting to make sure while the exception is
0347: // unwinding the un-synchronized stack, no thread grabs
0348: // the corrupt ThreadState that hit the aborting
0349: // exception:
0350: assert ae == null || abortCount > 0;
0351:
0352: try {
0353:
0354: if (infoStream != null)
0355: infoStream.println("docWriter: now abort");
0356:
0357: // Forcefully remove waiting ThreadStates from line
0358: for (int i = 0; i < numWaiting; i++)
0359: waitingThreadStates[i].isIdle = true;
0360: numWaiting = 0;
0361:
0362: // Wait for all other threads to finish with DocumentsWriter:
0363: pauseAllThreads();
0364:
0365: assert 0 == numWaiting;
0366:
0367: try {
0368:
0369: bufferedDeleteTerms.clear();
0370: bufferedDeleteDocIDs.clear();
0371: numBufferedDeleteTerms = 0;
0372:
0373: try {
0374: abortedFiles = files();
0375: } catch (Throwable t) {
0376: abortedFiles = null;
0377: }
0378:
0379: docStoreSegment = null;
0380: numDocsInStore = 0;
0381: docStoreOffset = 0;
0382: files = null;
0383:
0384: // Clear vectors & fields from ThreadStates
0385: for (int i = 0; i < threadStates.length; i++) {
0386: ThreadState state = threadStates[i];
0387: state.tvfLocal.reset();
0388: state.fdtLocal.reset();
0389: if (state.localFieldsWriter != null) {
0390: try {
0391: state.localFieldsWriter.close();
0392: } catch (Throwable t) {
0393: }
0394: state.localFieldsWriter = null;
0395: }
0396: }
0397:
0398: // Reset vectors writer
0399: if (tvx != null) {
0400: try {
0401: tvx.close();
0402: } catch (Throwable t) {
0403: }
0404: tvx = null;
0405: }
0406: if (tvd != null) {
0407: try {
0408: tvd.close();
0409: } catch (Throwable t) {
0410: }
0411: tvd = null;
0412: }
0413: if (tvf != null) {
0414: try {
0415: tvf.close();
0416: } catch (Throwable t) {
0417: }
0418: tvf = null;
0419: }
0420:
0421: // Reset fields writer
0422: if (fieldsWriter != null) {
0423: try {
0424: fieldsWriter.close();
0425: } catch (Throwable t) {
0426: }
0427: fieldsWriter = null;
0428: }
0429:
0430: // Discard pending norms:
0431: final int numField = fieldInfos.size();
0432: for (int i = 0; i < numField; i++) {
0433: FieldInfo fi = fieldInfos.fieldInfo(i);
0434: if (fi.isIndexed && !fi.omitNorms) {
0435: BufferedNorms n = norms[i];
0436: if (n != null)
0437: try {
0438: n.reset();
0439: } catch (Throwable t) {
0440: }
0441: }
0442: }
0443:
0444: // Reset all postings data
0445: resetPostingsData();
0446:
0447: } finally {
0448: resumeAllThreads();
0449: }
0450:
0451: // If we have a root cause exception, re-throw it now:
0452: if (ae != null) {
0453: Throwable t = ae.getCause();
0454: if (t instanceof IOException)
0455: throw (IOException) t;
0456: else if (t instanceof RuntimeException)
0457: throw (RuntimeException) t;
0458: else if (t instanceof Error)
0459: throw (Error) t;
0460: else
0461: // Should not get here
0462: assert false : "unknown exception: " + t;
0463: }
0464: } finally {
0465: if (ae != null)
0466: abortCount--;
0467: notifyAll();
0468: }
0469: }
0470:
0471: /** Reset after a flush */
0472: private void resetPostingsData() throws IOException {
0473: // All ThreadStates should be idle when we are called
0474: assert allThreadsIdle();
0475: threadBindings.clear();
0476: segment = null;
0477: numDocsInRAM = 0;
0478: nextDocID = 0;
0479: nextWriteDocID = 0;
0480: files = null;
0481: balanceRAM();
0482: bufferIsFull = false;
0483: flushPending = false;
0484: for (int i = 0; i < threadStates.length; i++) {
0485: threadStates[i].numThreads = 0;
0486: threadStates[i].resetPostings();
0487: }
0488: numBytesUsed = 0;
0489: }
0490:
0491: // Returns true if an abort is in progress
0492: synchronized boolean pauseAllThreads() {
0493: pauseThreads++;
0494: while (!allThreadsIdle()) {
0495: try {
0496: wait();
0497: } catch (InterruptedException e) {
0498: Thread.currentThread().interrupt();
0499: }
0500: }
0501: return abortCount > 0;
0502: }
0503:
0504: synchronized void resumeAllThreads() {
0505: pauseThreads--;
0506: assert pauseThreads >= 0;
0507: if (0 == pauseThreads)
0508: notifyAll();
0509: }
0510:
0511: private synchronized boolean allThreadsIdle() {
0512: for (int i = 0; i < threadStates.length; i++)
0513: if (!threadStates[i].isIdle)
0514: return false;
0515: return true;
0516: }
0517:
0518: private boolean hasNorms; // Whether any norms were seen since last flush
0519:
0520: List newFiles;
0521:
0522: /** Flush all pending docs to a new segment */
0523: synchronized int flush(boolean closeDocStore) throws IOException {
0524:
0525: assert allThreadsIdle();
0526:
0527: if (segment == null)
0528: // In case we are asked to flush an empty segment
0529: segment = writer.newSegmentName();
0530:
0531: newFiles = new ArrayList();
0532:
0533: docStoreOffset = numDocsInStore;
0534:
0535: int docCount;
0536:
0537: assert numDocsInRAM > 0;
0538:
0539: if (infoStream != null)
0540: infoStream.println("\nflush postings as segment " + segment
0541: + " numDocs=" + numDocsInRAM);
0542:
0543: boolean success = false;
0544:
0545: try {
0546:
0547: if (closeDocStore) {
0548: assert docStoreSegment != null;
0549: assert docStoreSegment.equals(segment);
0550: newFiles.addAll(files());
0551: closeDocStore();
0552: }
0553:
0554: fieldInfos.write(directory, segment + ".fnm");
0555:
0556: docCount = numDocsInRAM;
0557:
0558: newFiles.addAll(writeSegment());
0559:
0560: success = true;
0561:
0562: } finally {
0563: if (!success)
0564: abort(null);
0565: }
0566:
0567: return docCount;
0568: }
0569:
0570: /** Build compound file for the segment we just flushed */
0571: void createCompoundFile(String segment) throws IOException {
0572: CompoundFileWriter cfsWriter = new CompoundFileWriter(
0573: directory, segment + "."
0574: + IndexFileNames.COMPOUND_FILE_EXTENSION);
0575: final int size = newFiles.size();
0576: for (int i = 0; i < size; i++)
0577: cfsWriter.addFile((String) newFiles.get(i));
0578:
0579: // Perform the merge
0580: cfsWriter.close();
0581: }
0582:
0583: /** Set flushPending if it is not already set and returns
0584: * whether it was set. This is used by IndexWriter to *
0585: * trigger a single flush even when multiple threads are
0586: * * trying to do so. */
0587: synchronized boolean setFlushPending() {
0588: if (flushPending)
0589: return false;
0590: else {
0591: flushPending = true;
0592: return true;
0593: }
0594: }
0595:
0596: synchronized void clearFlushPending() {
0597: flushPending = false;
0598: }
0599:
0600: /** Per-thread state. We keep a separate Posting hash and
0601: * other state for each thread and then merge postings *
0602: * hashes from all threads when writing the segment. */
0603: private final class ThreadState {
0604:
0605: Posting[] postingsFreeList; // Free Posting instances
0606: int postingsFreeCount;
0607:
0608: RAMOutputStream tvfLocal = new RAMOutputStream(); // Term vectors for one doc
0609: RAMOutputStream fdtLocal = new RAMOutputStream(); // Stored fields for one doc
0610: FieldsWriter localFieldsWriter; // Fields for one doc
0611:
0612: long[] vectorFieldPointers;
0613: int[] vectorFieldNumbers;
0614:
0615: boolean isIdle = true; // Whether we are in use
0616: int numThreads = 1; // Number of threads that use this instance
0617:
0618: int docID; // docID we are now working on
0619: int numStoredFields; // How many stored fields in current doc
0620: float docBoost; // Boost for current doc
0621:
0622: FieldData[] fieldDataArray; // Fields touched by current doc
0623: int numFieldData; // How many fields in current doc
0624: int numVectorFields; // How many vector fields in current doc
0625:
0626: FieldData[] allFieldDataArray = new FieldData[10]; // All FieldData instances
0627: int numAllFieldData;
0628: FieldData[] fieldDataHash; // Hash FieldData instances by field name
0629: int fieldDataHashMask;
0630: String maxTermPrefix; // Non-null prefix of a too-large term if this
0631: // doc has one
0632:
0633: boolean doFlushAfter;
0634:
0635: public ThreadState() {
0636: fieldDataArray = new FieldData[8];
0637:
0638: fieldDataHash = new FieldData[16];
0639: fieldDataHashMask = 15;
0640:
0641: vectorFieldPointers = new long[10];
0642: vectorFieldNumbers = new int[10];
0643: postingsFreeList = new Posting[256];
0644: postingsFreeCount = 0;
0645: }
0646:
0647: /** Clear the postings hash and return objects back to
0648: * shared pool */
0649: public void resetPostings() throws IOException {
0650: fieldGen = 0;
0651: maxPostingsVectors = 0;
0652: doFlushAfter = false;
0653: if (localFieldsWriter != null) {
0654: localFieldsWriter.close();
0655: localFieldsWriter = null;
0656: }
0657: postingsPool.reset();
0658: charPool.reset();
0659: recyclePostings(postingsFreeList, postingsFreeCount);
0660: postingsFreeCount = 0;
0661: for (int i = 0; i < numAllFieldData; i++) {
0662: FieldData fp = allFieldDataArray[i];
0663: fp.lastGen = -1;
0664: if (fp.numPostings > 0)
0665: fp.resetPostingArrays();
0666: }
0667: }
0668:
0669: /** Move all per-document state that was accumulated in
0670: * the ThreadState into the "real" stores. */
0671: public void writeDocument() throws IOException, AbortException {
0672:
0673: // If we hit an exception while appending to the
0674: // stored fields or term vectors files, we have to
0675: // abort all documents since we last flushed because
0676: // it means those files are possibly inconsistent.
0677: try {
0678:
0679: numDocsInStore++;
0680:
0681: // Append stored fields to the real FieldsWriter:
0682: fieldsWriter.flushDocument(numStoredFields, fdtLocal);
0683: fdtLocal.reset();
0684:
0685: // Append term vectors to the real outputs:
0686: if (tvx != null) {
0687: tvx.writeLong(tvd.getFilePointer());
0688: tvd.writeVInt(numVectorFields);
0689: if (numVectorFields > 0) {
0690: for (int i = 0; i < numVectorFields; i++)
0691: tvd.writeVInt(vectorFieldNumbers[i]);
0692: assert 0 == vectorFieldPointers[0];
0693: tvd.writeVLong(tvf.getFilePointer());
0694: long lastPos = vectorFieldPointers[0];
0695: for (int i = 1; i < numVectorFields; i++) {
0696: long pos = vectorFieldPointers[i];
0697: tvd.writeVLong(pos - lastPos);
0698: lastPos = pos;
0699: }
0700: tvfLocal.writeTo(tvf);
0701: tvfLocal.reset();
0702: }
0703: }
0704:
0705: // Append norms for the fields we saw:
0706: for (int i = 0; i < numFieldData; i++) {
0707: FieldData fp = fieldDataArray[i];
0708: if (fp.doNorms) {
0709: BufferedNorms bn = norms[fp.fieldInfo.number];
0710: assert bn != null;
0711: assert bn.upto <= docID;
0712: bn.fill(docID);
0713: float norm = fp.boost
0714: * writer.getSimilarity().lengthNorm(
0715: fp.fieldInfo.name, fp.length);
0716: bn.add(norm);
0717: }
0718: }
0719: } catch (Throwable t) {
0720: // Forcefully idle this threadstate -- its state will
0721: // be reset by abort()
0722: isIdle = true;
0723: throw new AbortException(t, DocumentsWriter.this );
0724: }
0725:
0726: if (bufferIsFull && !flushPending) {
0727: flushPending = true;
0728: doFlushAfter = true;
0729: }
0730: }
0731:
0732: int fieldGen;
0733:
0734: /** Initializes shared state for this new document */
0735: void init(Document doc, int docID) throws IOException,
0736: AbortException {
0737:
0738: assert !isIdle;
0739:
0740: this .docID = docID;
0741: docBoost = doc.getBoost();
0742: numStoredFields = 0;
0743: numFieldData = 0;
0744: numVectorFields = 0;
0745: maxTermPrefix = null;
0746:
0747: assert 0 == fdtLocal.length();
0748: assert 0 == fdtLocal.getFilePointer();
0749: assert 0 == tvfLocal.length();
0750: assert 0 == tvfLocal.getFilePointer();
0751: final int this FieldGen = fieldGen++;
0752:
0753: List docFields = doc.getFields();
0754: final int numDocFields = docFields.size();
0755: boolean docHasVectors = false;
0756:
0757: // Absorb any new fields first seen in this document.
0758: // Also absorb any changes to fields we had already
0759: // seen before (eg suddenly turning on norms or
0760: // vectors, etc.):
0761:
0762: for (int i = 0; i < numDocFields; i++) {
0763: Fieldable field = (Fieldable) docFields.get(i);
0764:
0765: FieldInfo fi = fieldInfos.add(field.name(), field
0766: .isIndexed(), field.isTermVectorStored(), field
0767: .isStorePositionWithTermVector(), field
0768: .isStoreOffsetWithTermVector(), field
0769: .getOmitNorms(), false);
0770: if (fi.isIndexed && !fi.omitNorms) {
0771: // Maybe grow our buffered norms
0772: if (norms.length <= fi.number) {
0773: int newSize = (int) ((1 + fi.number) * 1.25);
0774: BufferedNorms[] newNorms = new BufferedNorms[newSize];
0775: System.arraycopy(norms, 0, newNorms, 0,
0776: norms.length);
0777: norms = newNorms;
0778: }
0779:
0780: if (norms[fi.number] == null)
0781: norms[fi.number] = new BufferedNorms();
0782:
0783: hasNorms = true;
0784: }
0785:
0786: // Make sure we have a FieldData allocated
0787: int hashPos = fi.name.hashCode() & fieldDataHashMask;
0788: FieldData fp = fieldDataHash[hashPos];
0789: while (fp != null && !fp.fieldInfo.name.equals(fi.name))
0790: fp = fp.next;
0791:
0792: if (fp == null) {
0793:
0794: fp = new FieldData(fi);
0795: fp.next = fieldDataHash[hashPos];
0796: fieldDataHash[hashPos] = fp;
0797:
0798: if (numAllFieldData == allFieldDataArray.length) {
0799: int newSize = (int) (allFieldDataArray.length * 1.5);
0800: int newHashSize = fieldDataHash.length * 2;
0801:
0802: FieldData newArray[] = new FieldData[newSize];
0803: FieldData newHashArray[] = new FieldData[newHashSize];
0804: System.arraycopy(allFieldDataArray, 0,
0805: newArray, 0, numAllFieldData);
0806:
0807: // Rehash
0808: fieldDataHashMask = newSize - 1;
0809: for (int j = 0; j < fieldDataHash.length; j++) {
0810: FieldData fp0 = fieldDataHash[j];
0811: while (fp0 != null) {
0812: hashPos = fp0.fieldInfo.name.hashCode()
0813: & fieldDataHashMask;
0814: FieldData nextFP0 = fp0.next;
0815: fp0.next = newHashArray[hashPos];
0816: newHashArray[hashPos] = fp0;
0817: fp0 = nextFP0;
0818: }
0819: }
0820:
0821: allFieldDataArray = newArray;
0822: fieldDataHash = newHashArray;
0823: }
0824: allFieldDataArray[numAllFieldData++] = fp;
0825: } else {
0826: assert fp.fieldInfo == fi;
0827: }
0828:
0829: if (this FieldGen != fp.lastGen) {
0830:
0831: // First time we're seeing this field for this doc
0832: fp.lastGen = this FieldGen;
0833: fp.fieldCount = 0;
0834: fp.doVectors = fp.doVectorPositions = fp.doVectorOffsets = false;
0835: fp.doNorms = fi.isIndexed && !fi.omitNorms;
0836:
0837: if (numFieldData == fieldDataArray.length) {
0838: int newSize = fieldDataArray.length * 2;
0839: FieldData newArray[] = new FieldData[newSize];
0840: System.arraycopy(fieldDataArray, 0, newArray,
0841: 0, numFieldData);
0842: fieldDataArray = newArray;
0843:
0844: }
0845: fieldDataArray[numFieldData++] = fp;
0846: }
0847:
0848: if (field.isTermVectorStored()) {
0849: if (!fp.doVectors
0850: && numVectorFields++ == vectorFieldPointers.length) {
0851: final int newSize = (int) (numVectorFields * 1.5);
0852: vectorFieldPointers = new long[newSize];
0853: vectorFieldNumbers = new int[newSize];
0854: }
0855: fp.doVectors = true;
0856: docHasVectors = true;
0857:
0858: fp.doVectorPositions |= field
0859: .isStorePositionWithTermVector();
0860: fp.doVectorOffsets |= field
0861: .isStoreOffsetWithTermVector();
0862: }
0863:
0864: if (fp.fieldCount == fp.docFields.length) {
0865: Fieldable[] newArray = new Fieldable[fp.docFields.length * 2];
0866: System.arraycopy(fp.docFields, 0, newArray, 0,
0867: fp.docFields.length);
0868: fp.docFields = newArray;
0869: }
0870:
0871: // Lazily allocate arrays for postings:
0872: if (field.isIndexed() && fp.postingsHash == null)
0873: fp.initPostingArrays();
0874:
0875: fp.docFields[fp.fieldCount++] = field;
0876: }
0877:
0878: // Maybe init the local & global fieldsWriter
0879: if (localFieldsWriter == null) {
0880: if (fieldsWriter == null) {
0881: assert docStoreSegment == null;
0882: assert segment != null;
0883: docStoreSegment = segment;
0884: // If we hit an exception while init'ing the
0885: // fieldsWriter, we must abort this segment
0886: // because those files will be in an unknown
0887: // state:
0888: try {
0889: fieldsWriter = new FieldsWriter(directory,
0890: docStoreSegment, fieldInfos);
0891: } catch (Throwable t) {
0892: throw new AbortException(t,
0893: DocumentsWriter.this );
0894: }
0895: files = null;
0896: }
0897: localFieldsWriter = new FieldsWriter(null, fdtLocal,
0898: fieldInfos);
0899: }
0900:
0901: // First time we see a doc that has field(s) with
0902: // stored vectors, we init our tvx writer
0903: if (docHasVectors) {
0904: if (tvx == null) {
0905: assert docStoreSegment != null;
0906: // If we hit an exception while init'ing the term
0907: // vector output files, we must abort this segment
0908: // because those files will be in an unknown
0909: // state:
0910: try {
0911: tvx = directory
0912: .createOutput(docStoreSegment
0913: + "."
0914: + IndexFileNames.VECTORS_INDEX_EXTENSION);
0915: tvx.writeInt(TermVectorsReader.FORMAT_VERSION);
0916: tvd = directory
0917: .createOutput(docStoreSegment
0918: + "."
0919: + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
0920: tvd.writeInt(TermVectorsReader.FORMAT_VERSION);
0921: tvf = directory
0922: .createOutput(docStoreSegment
0923: + "."
0924: + IndexFileNames.VECTORS_FIELDS_EXTENSION);
0925: tvf.writeInt(TermVectorsReader.FORMAT_VERSION);
0926:
0927: // We must "catch up" for all docs before us
0928: // that had no vectors:
0929: for (int i = 0; i < numDocsInStore; i++) {
0930: tvx.writeLong(tvd.getFilePointer());
0931: tvd.writeVInt(0);
0932: }
0933:
0934: } catch (Throwable t) {
0935: throw new AbortException(t,
0936: DocumentsWriter.this );
0937: }
0938: files = null;
0939: }
0940:
0941: numVectorFields = 0;
0942: }
0943: }
0944:
0945: /** Do in-place sort of Posting array */
0946: void doPostingSort(Posting[] postings, int numPosting) {
0947: quickSort(postings, 0, numPosting - 1);
0948: }
0949:
0950: void quickSort(Posting[] postings, int lo, int hi) {
0951: if (lo >= hi)
0952: return;
0953:
0954: int mid = (lo + hi) >>> 1;
0955:
0956: if (comparePostings(postings[lo], postings[mid]) > 0) {
0957: Posting tmp = postings[lo];
0958: postings[lo] = postings[mid];
0959: postings[mid] = tmp;
0960: }
0961:
0962: if (comparePostings(postings[mid], postings[hi]) > 0) {
0963: Posting tmp = postings[mid];
0964: postings[mid] = postings[hi];
0965: postings[hi] = tmp;
0966:
0967: if (comparePostings(postings[lo], postings[mid]) > 0) {
0968: Posting tmp2 = postings[lo];
0969: postings[lo] = postings[mid];
0970: postings[mid] = tmp2;
0971: }
0972: }
0973:
0974: int left = lo + 1;
0975: int right = hi - 1;
0976:
0977: if (left >= right)
0978: return;
0979:
0980: Posting partition = postings[mid];
0981:
0982: for (;;) {
0983: while (comparePostings(postings[right], partition) > 0)
0984: --right;
0985:
0986: while (left < right
0987: && comparePostings(postings[left], partition) <= 0)
0988: ++left;
0989:
0990: if (left < right) {
0991: Posting tmp = postings[left];
0992: postings[left] = postings[right];
0993: postings[right] = tmp;
0994: --right;
0995: } else {
0996: break;
0997: }
0998: }
0999:
1000: quickSort(postings, lo, left);
1001: quickSort(postings, left + 1, hi);
1002: }
1003:
1004: /** Do in-place sort of PostingVector array */
1005: void doVectorSort(PostingVector[] postings, int numPosting) {
1006: quickSort(postings, 0, numPosting - 1);
1007: }
1008:
1009: void quickSort(PostingVector[] postings, int lo, int hi) {
1010: if (lo >= hi)
1011: return;
1012:
1013: int mid = (lo + hi) >>> 1;
1014:
1015: if (comparePostings(postings[lo].p, postings[mid].p) > 0) {
1016: PostingVector tmp = postings[lo];
1017: postings[lo] = postings[mid];
1018: postings[mid] = tmp;
1019: }
1020:
1021: if (comparePostings(postings[mid].p, postings[hi].p) > 0) {
1022: PostingVector tmp = postings[mid];
1023: postings[mid] = postings[hi];
1024: postings[hi] = tmp;
1025:
1026: if (comparePostings(postings[lo].p, postings[mid].p) > 0) {
1027: PostingVector tmp2 = postings[lo];
1028: postings[lo] = postings[mid];
1029: postings[mid] = tmp2;
1030: }
1031: }
1032:
1033: int left = lo + 1;
1034: int right = hi - 1;
1035:
1036: if (left >= right)
1037: return;
1038:
1039: PostingVector partition = postings[mid];
1040:
1041: for (;;) {
1042: while (comparePostings(postings[right].p, partition.p) > 0)
1043: --right;
1044:
1045: while (left < right
1046: && comparePostings(postings[left].p,
1047: partition.p) <= 0)
1048: ++left;
1049:
1050: if (left < right) {
1051: PostingVector tmp = postings[left];
1052: postings[left] = postings[right];
1053: postings[right] = tmp;
1054: --right;
1055: } else {
1056: break;
1057: }
1058: }
1059:
1060: quickSort(postings, lo, left);
1061: quickSort(postings, left + 1, hi);
1062: }
1063:
1064: /** If there are fields we've seen but did not see again
1065: * in the last run, then free them up. Also reduce
1066: * postings hash size. */
1067: void trimFields() {
1068:
1069: int upto = 0;
1070: for (int i = 0; i < numAllFieldData; i++) {
1071: FieldData fp = allFieldDataArray[i];
1072: if (fp.lastGen == -1) {
1073: // This field was not seen since the previous
1074: // flush, so, free up its resources now
1075:
1076: // Unhash
1077: final int hashPos = fp.fieldInfo.name.hashCode()
1078: & fieldDataHashMask;
1079: FieldData last = null;
1080: FieldData fp0 = fieldDataHash[hashPos];
1081: while (fp0 != fp) {
1082: last = fp0;
1083: fp0 = fp0.next;
1084: }
1085: assert fp0 != null;
1086:
1087: if (last == null)
1088: fieldDataHash[hashPos] = fp.next;
1089: else
1090: last.next = fp.next;
1091:
1092: if (infoStream != null)
1093: infoStream.println(" remove field="
1094: + fp.fieldInfo.name);
1095:
1096: } else {
1097: // Reset
1098: fp.lastGen = -1;
1099: allFieldDataArray[upto++] = fp;
1100:
1101: if (fp.numPostings > 0
1102: && ((float) fp.numPostings)
1103: / fp.postingsHashSize < 0.2) {
1104: int hashSize = fp.postingsHashSize;
1105:
1106: // Reduce hash so it's between 25-50% full
1107: while (fp.numPostings < (hashSize >> 1)
1108: && hashSize >= 2)
1109: hashSize >>= 1;
1110: hashSize <<= 1;
1111:
1112: if (hashSize != fp.postingsHash.length)
1113: fp.rehashPostings(hashSize);
1114: }
1115: }
1116: }
1117:
1118: // If we didn't see any norms for this field since
1119: // last flush, free it
1120: for (int i = 0; i < norms.length; i++) {
1121: BufferedNorms n = norms[i];
1122: if (n != null && n.upto == 0)
1123: norms[i] = null;
1124: }
1125:
1126: numAllFieldData = upto;
1127:
1128: // Also pare back PostingsVectors if it's excessively
1129: // large
1130: if (maxPostingsVectors * 1.5 < postingsVectors.length) {
1131: final int newSize;
1132: if (0 == maxPostingsVectors)
1133: newSize = 1;
1134: else
1135: newSize = (int) (1.5 * maxPostingsVectors);
1136: PostingVector[] newArray = new PostingVector[newSize];
1137: System.arraycopy(postingsVectors, 0, newArray, 0,
1138: newSize);
1139: postingsVectors = newArray;
1140: }
1141: }
1142:
1143: /** Tokenizes the fields of a document into Postings */
1144: void processDocument(Analyzer analyzer) throws IOException,
1145: AbortException {
1146:
1147: final int numFields = numFieldData;
1148:
1149: assert 0 == fdtLocal.length();
1150:
1151: if (tvx != null)
1152: // If we are writing vectors then we must visit
1153: // fields in sorted order so they are written in
1154: // sorted order. TODO: we actually only need to
1155: // sort the subset of fields that have vectors
1156: // enabled; we could save [small amount of] CPU
1157: // here.
1158: Arrays.sort(fieldDataArray, 0, numFields);
1159:
1160: // We process the document one field at a time
1161: for (int i = 0; i < numFields; i++)
1162: fieldDataArray[i].processField(analyzer);
1163:
1164: if (maxTermPrefix != null && infoStream != null)
1165: infoStream
1166: .println("WARNING: document contains at least one immense term (longer than the max length "
1167: + MAX_TERM_LENGTH
1168: + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '"
1169: + maxTermPrefix + "...'");
1170:
1171: if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH
1172: && numBytesUsed > 0.95 * ramBufferSize)
1173: balanceRAM();
1174: }
1175:
1176: final ByteBlockPool postingsPool = new ByteBlockPool();
1177: final ByteBlockPool vectorsPool = new ByteBlockPool();
1178: final CharBlockPool charPool = new CharBlockPool();
1179:
1180: // Current posting we are working on
1181: Posting p;
1182: PostingVector vector;
1183:
1184: // USE ONLY FOR DEBUGGING!
1185: /*
1186: public String getPostingText() {
1187: char[] text = charPool.buffers[p.textStart >> CHAR_BLOCK_SHIFT];
1188: int upto = p.textStart & CHAR_BLOCK_MASK;
1189: while(text[upto] != 0xffff)
1190: upto++;
1191: return new String(text, p.textStart, upto-(p.textStart & BYTE_BLOCK_MASK));
1192: }
1193: */
1194:
1195: /** Test whether the text for current Posting p equals
1196: * current tokenText. */
1197: boolean postingEquals(final char[] tokenText,
1198: final int tokenTextLen) {
1199:
1200: final char[] text = charPool.buffers[p.textStart >> CHAR_BLOCK_SHIFT];
1201: assert text != null;
1202: int pos = p.textStart & CHAR_BLOCK_MASK;
1203:
1204: int tokenPos = 0;
1205: for (; tokenPos < tokenTextLen; pos++, tokenPos++)
1206: if (tokenText[tokenPos] != text[pos])
1207: return false;
1208: return 0xffff == text[pos];
1209: }
1210:
1211: /** Compares term text for two Posting instance and
1212: * returns -1 if p1 < p2; 1 if p1 > p2; else 0.
1213: */
1214: int comparePostings(Posting p1, Posting p2) {
1215: final char[] text1 = charPool.buffers[p1.textStart >> CHAR_BLOCK_SHIFT];
1216: int pos1 = p1.textStart & CHAR_BLOCK_MASK;
1217: final char[] text2 = charPool.buffers[p2.textStart >> CHAR_BLOCK_SHIFT];
1218: int pos2 = p2.textStart & CHAR_BLOCK_MASK;
1219: while (true) {
1220: final char c1 = text1[pos1++];
1221: final char c2 = text2[pos2++];
1222: if (c1 < c2)
1223: if (0xffff == c2)
1224: return 1;
1225: else
1226: return -1;
1227: else if (c2 < c1)
1228: if (0xffff == c1)
1229: return -1;
1230: else
1231: return 1;
1232: else if (0xffff == c1)
1233: return 0;
1234: }
1235: }
1236:
1237: /** Write vInt into freq stream of current Posting */
1238: public void writeFreqVInt(int i) {
1239: while ((i & ~0x7F) != 0) {
1240: writeFreqByte((byte) ((i & 0x7f) | 0x80));
1241: i >>>= 7;
1242: }
1243: writeFreqByte((byte) i);
1244: }
1245:
1246: /** Write vInt into prox stream of current Posting */
1247: public void writeProxVInt(int i) {
1248: while ((i & ~0x7F) != 0) {
1249: writeProxByte((byte) ((i & 0x7f) | 0x80));
1250: i >>>= 7;
1251: }
1252: writeProxByte((byte) i);
1253: }
1254:
1255: /** Write byte into freq stream of current Posting */
1256: byte[] freq;
1257: int freqUpto;
1258:
1259: public void writeFreqByte(byte b) {
1260: assert freq != null;
1261: if (freq[freqUpto] != 0) {
1262: freqUpto = postingsPool.allocSlice(freq, freqUpto);
1263: freq = postingsPool.buffer;
1264: p.freqUpto = postingsPool.byteOffset;
1265: }
1266: freq[freqUpto++] = b;
1267: }
1268:
1269: /** Write byte into prox stream of current Posting */
1270: byte[] prox;
1271: int proxUpto;
1272:
1273: public void writeProxByte(byte b) {
1274: assert prox != null;
1275: if (prox[proxUpto] != 0) {
1276: proxUpto = postingsPool.allocSlice(prox, proxUpto);
1277: prox = postingsPool.buffer;
1278: p.proxUpto = postingsPool.byteOffset;
1279: assert prox != null;
1280: }
1281: prox[proxUpto++] = b;
1282: assert proxUpto != prox.length;
1283: }
1284:
1285: /** Currently only used to copy a payload into the prox
1286: * stream. */
1287: public void writeProxBytes(byte[] b, int offset, int len) {
1288: final int offsetEnd = offset + len;
1289: while (offset < offsetEnd) {
1290: if (prox[proxUpto] != 0) {
1291: // End marker
1292: proxUpto = postingsPool.allocSlice(prox, proxUpto);
1293: prox = postingsPool.buffer;
1294: p.proxUpto = postingsPool.byteOffset;
1295: }
1296:
1297: prox[proxUpto++] = b[offset++];
1298: assert proxUpto != prox.length;
1299: }
1300: }
1301:
1302: /** Write vInt into offsets stream of current
1303: * PostingVector */
1304: public void writeOffsetVInt(int i) {
1305: while ((i & ~0x7F) != 0) {
1306: writeOffsetByte((byte) ((i & 0x7f) | 0x80));
1307: i >>>= 7;
1308: }
1309: writeOffsetByte((byte) i);
1310: }
1311:
1312: byte[] offsets;
1313: int offsetUpto;
1314:
1315: /** Write byte into offsets stream of current
1316: * PostingVector */
1317: public void writeOffsetByte(byte b) {
1318: assert offsets != null;
1319: if (offsets[offsetUpto] != 0) {
1320: offsetUpto = vectorsPool
1321: .allocSlice(offsets, offsetUpto);
1322: offsets = vectorsPool.buffer;
1323: vector.offsetUpto = vectorsPool.byteOffset;
1324: }
1325: offsets[offsetUpto++] = b;
1326: }
1327:
1328: /** Write vInt into pos stream of current
1329: * PostingVector */
1330: public void writePosVInt(int i) {
1331: while ((i & ~0x7F) != 0) {
1332: writePosByte((byte) ((i & 0x7f) | 0x80));
1333: i >>>= 7;
1334: }
1335: writePosByte((byte) i);
1336: }
1337:
1338: byte[] pos;
1339: int posUpto;
1340:
1341: /** Write byte into pos stream of current
1342: * PostingVector */
1343: public void writePosByte(byte b) {
1344: assert pos != null;
1345: if (pos[posUpto] != 0) {
1346: posUpto = vectorsPool.allocSlice(pos, posUpto);
1347: pos = vectorsPool.buffer;
1348: vector.posUpto = vectorsPool.byteOffset;
1349: }
1350: pos[posUpto++] = b;
1351: }
1352:
1353: PostingVector[] postingsVectors = new PostingVector[1];
1354: int maxPostingsVectors;
1355:
1356: // Used to read a string value for a field
1357: ReusableStringReader stringReader = new ReusableStringReader();
1358:
1359: /** Holds data associated with a single field, including
1360: * the Postings hash. A document may have many *
1361: * occurrences for a given field name; we gather all *
1362: * such occurrences here (in docFields) so that we can
1363: * * process the entire field at once. */
1364: private final class FieldData implements Comparable {
1365:
1366: ThreadState threadState;
1367: FieldInfo fieldInfo;
1368:
1369: int fieldCount;
1370: Fieldable[] docFields = new Fieldable[1];
1371:
1372: int lastGen = -1;
1373: FieldData next;
1374:
1375: boolean doNorms;
1376: boolean doVectors;
1377: boolean doVectorPositions;
1378: boolean doVectorOffsets;
1379: boolean postingsCompacted;
1380:
1381: int numPostings;
1382:
1383: Posting[] postingsHash;
1384: int postingsHashSize;
1385: int postingsHashHalfSize;
1386: int postingsHashMask;
1387:
1388: int position;
1389: int length;
1390: int offset;
1391: float boost;
1392: int postingsVectorsUpto;
1393:
1394: public FieldData(FieldInfo fieldInfo) {
1395: this .fieldInfo = fieldInfo;
1396: threadState = ThreadState.this ;
1397: }
1398:
1399: void resetPostingArrays() {
1400: if (!postingsCompacted)
1401: compactPostings();
1402: recyclePostings(this .postingsHash, numPostings);
1403: Arrays.fill(postingsHash, 0, postingsHash.length, null);
1404: postingsCompacted = false;
1405: numPostings = 0;
1406: }
1407:
1408: void initPostingArrays() {
1409: // Target hash fill factor of <= 50%
1410: // NOTE: must be a power of two for hash collision
1411: // strategy to work correctly
1412: postingsHashSize = 4;
1413: postingsHashHalfSize = 2;
1414: postingsHashMask = postingsHashSize - 1;
1415: postingsHash = new Posting[postingsHashSize];
1416: }
1417:
1418: /** So Arrays.sort can sort us. */
1419: public int compareTo(Object o) {
1420: return fieldInfo.name
1421: .compareTo(((FieldData) o).fieldInfo.name);
1422: }
1423:
1424: private void compactPostings() {
1425: int upto = 0;
1426: for (int i = 0; i < postingsHashSize; i++)
1427: if (postingsHash[i] != null)
1428: postingsHash[upto++] = postingsHash[i];
1429:
1430: assert upto == numPostings;
1431: postingsCompacted = true;
1432: }
1433:
1434: /** Collapse the hash table & sort in-place. */
1435: public Posting[] sortPostings() {
1436: compactPostings();
1437: doPostingSort(postingsHash, numPostings);
1438: return postingsHash;
1439: }
1440:
1441: /** Process all occurrences of one field in the document. */
1442: public void processField(Analyzer analyzer)
1443: throws IOException, AbortException {
1444: length = 0;
1445: position = 0;
1446: offset = 0;
1447: boost = docBoost;
1448:
1449: final int maxFieldLength = writer.getMaxFieldLength();
1450:
1451: final int limit = fieldCount;
1452: final Fieldable[] docFieldsFinal = docFields;
1453:
1454: boolean doWriteVectors = true;
1455:
1456: // Walk through all occurrences in this doc for this
1457: // field:
1458: try {
1459: for (int j = 0; j < limit; j++) {
1460: Fieldable field = docFieldsFinal[j];
1461:
1462: if (field.isIndexed())
1463: invertField(field, analyzer, maxFieldLength);
1464:
1465: if (field.isStored()) {
1466: numStoredFields++;
1467: boolean success = false;
1468: try {
1469: localFieldsWriter.writeField(fieldInfo,
1470: field);
1471: success = true;
1472: } finally {
1473: // If we hit an exception inside
1474: // localFieldsWriter.writeField, the
1475: // contents of fdtLocal can be corrupt, so
1476: // we must discard all stored fields for
1477: // this document:
1478: if (!success)
1479: fdtLocal.reset();
1480: }
1481: }
1482:
1483: docFieldsFinal[j] = null;
1484: }
1485: } catch (AbortException ae) {
1486: doWriteVectors = false;
1487: throw ae;
1488: } finally {
1489: if (postingsVectorsUpto > 0) {
1490: try {
1491: if (doWriteVectors) {
1492: // Add term vectors for this field
1493: boolean success = false;
1494: try {
1495: writeVectors(fieldInfo);
1496: success = true;
1497: } finally {
1498: if (!success) {
1499: // If we hit an exception inside
1500: // writeVectors, the contents of tvfLocal
1501: // can be corrupt, so we must discard all
1502: // term vectors for this document:
1503: numVectorFields = 0;
1504: tvfLocal.reset();
1505: }
1506: }
1507: }
1508: } finally {
1509: if (postingsVectorsUpto > maxPostingsVectors)
1510: maxPostingsVectors = postingsVectorsUpto;
1511: postingsVectorsUpto = 0;
1512: vectorsPool.reset();
1513: }
1514: }
1515: }
1516: }
1517:
1518: int offsetEnd;
1519: Token localToken = new Token();
1520:
1521: /* Invert one occurrence of one field in the document */
1522: public void invertField(Fieldable field, Analyzer analyzer,
1523: final int maxFieldLength) throws IOException,
1524: AbortException {
1525:
1526: if (length > 0)
1527: position += analyzer
1528: .getPositionIncrementGap(fieldInfo.name);
1529:
1530: if (!field.isTokenized()) { // un-tokenized field
1531: String stringValue = field.stringValue();
1532: final int valueLength = stringValue.length();
1533: Token token = localToken;
1534: token.clear();
1535: char[] termBuffer = token.termBuffer();
1536: if (termBuffer.length < valueLength)
1537: termBuffer = token
1538: .resizeTermBuffer(valueLength);
1539: stringValue.getChars(0, valueLength, termBuffer, 0);
1540: token.setTermLength(valueLength);
1541: token.setStartOffset(offset);
1542: token.setEndOffset(offset + stringValue.length());
1543: addPosition(token);
1544: offset += stringValue.length();
1545: length++;
1546: } else { // tokenized field
1547: final TokenStream stream;
1548: final TokenStream streamValue = field
1549: .tokenStreamValue();
1550:
1551: if (streamValue != null)
1552: stream = streamValue;
1553: else {
1554: // the field does not have a TokenStream,
1555: // so we have to obtain one from the analyzer
1556: final Reader reader; // find or make Reader
1557: final Reader readerValue = field.readerValue();
1558:
1559: if (readerValue != null)
1560: reader = readerValue;
1561: else {
1562: String stringValue = field.stringValue();
1563: if (stringValue == null)
1564: throw new IllegalArgumentException(
1565: "field must have either TokenStream, String or Reader value");
1566: stringReader.init(stringValue);
1567: reader = stringReader;
1568: }
1569:
1570: // Tokenize field and add to postingTable
1571: stream = analyzer.reusableTokenStream(
1572: fieldInfo.name, reader);
1573: }
1574:
1575: // reset the TokenStream to the first token
1576: stream.reset();
1577:
1578: try {
1579: offsetEnd = offset - 1;
1580: Token token;
1581: for (;;) {
1582: token = stream.next(localToken);
1583: if (token == null)
1584: break;
1585: position += (token.getPositionIncrement() - 1);
1586: addPosition(token);
1587: if (++length >= maxFieldLength) {
1588: if (infoStream != null)
1589: infoStream
1590: .println("maxFieldLength "
1591: + maxFieldLength
1592: + " reached for field "
1593: + fieldInfo.name
1594: + ", ignoring following tokens");
1595: break;
1596: }
1597: }
1598: offset = offsetEnd + 1;
1599: } finally {
1600: stream.close();
1601: }
1602: }
1603:
1604: boost *= field.getBoost();
1605: }
1606:
1607: /** Only called when term vectors are enabled. This
1608: * is called the first time we see a given term for
1609: * each * document, to allocate a PostingVector
1610: * instance that * is used to record data needed to
1611: * write the posting * vectors. */
1612: private PostingVector addNewVector() {
1613:
1614: if (postingsVectorsUpto == postingsVectors.length) {
1615: final int newSize;
1616: if (postingsVectors.length < 2)
1617: newSize = 2;
1618: else
1619: newSize = (int) (1.5 * postingsVectors.length);
1620: PostingVector[] newArray = new PostingVector[newSize];
1621: System.arraycopy(postingsVectors, 0, newArray, 0,
1622: postingsVectors.length);
1623: postingsVectors = newArray;
1624: }
1625:
1626: p.vector = postingsVectors[postingsVectorsUpto];
1627: if (p.vector == null)
1628: p.vector = postingsVectors[postingsVectorsUpto] = new PostingVector();
1629:
1630: postingsVectorsUpto++;
1631:
1632: final PostingVector v = p.vector;
1633: v.p = p;
1634:
1635: final int firstSize = levelSizeArray[0];
1636:
1637: if (doVectorPositions) {
1638: final int upto = vectorsPool.newSlice(firstSize);
1639: v.posStart = v.posUpto = vectorsPool.byteOffset
1640: + upto;
1641: }
1642:
1643: if (doVectorOffsets) {
1644: final int upto = vectorsPool.newSlice(firstSize);
1645: v.offsetStart = v.offsetUpto = vectorsPool.byteOffset
1646: + upto;
1647: }
1648:
1649: return v;
1650: }
1651:
1652: int offsetStartCode;
1653: int offsetStart;
1654:
1655: /** This is the hotspot of indexing: it's called once
1656: * for every term of every document. Its job is to *
1657: * update the postings byte stream (Postings hash) *
1658: * based on the occurence of a single term. */
1659: private void addPosition(Token token) throws AbortException {
1660:
1661: final Payload payload = token.getPayload();
1662:
1663: // Get the text of this term. Term can either
1664: // provide a String token or offset into a char[]
1665: // array
1666: final char[] tokenText = token.termBuffer();
1667: final int tokenTextLen = token.termLength();
1668:
1669: int code = 0;
1670:
1671: // Compute hashcode
1672: int downto = tokenTextLen;
1673: while (downto > 0)
1674: code = (code * 31) + tokenText[--downto];
1675:
1676: // System.out.println(" addPosition: buffer=" + new String(tokenText, 0, tokenTextLen) + " pos=" + position + " offsetStart=" + (offset+token.startOffset()) + " offsetEnd=" + (offset + token.endOffset()) + " docID=" + docID + " doPos=" + doVectorPositions + " doOffset=" + doVectorOffsets);
1677:
1678: int hashPos = code & postingsHashMask;
1679:
1680: assert !postingsCompacted;
1681:
1682: // Locate Posting in hash
1683: p = postingsHash[hashPos];
1684:
1685: if (p != null
1686: && !postingEquals(tokenText, tokenTextLen)) {
1687: // Conflict: keep searching different locations in
1688: // the hash table.
1689: final int inc = ((code >> 8) + code) | 1;
1690: do {
1691: code += inc;
1692: hashPos = code & postingsHashMask;
1693: p = postingsHash[hashPos];
1694: } while (p != null
1695: && !postingEquals(tokenText, tokenTextLen));
1696: }
1697:
1698: final int proxCode;
1699:
1700: // If we hit an exception below, it's possible the
1701: // posting list or term vectors data will be
1702: // partially written and thus inconsistent if
1703: // flushed, so we have to abort all documents
1704: // since the last flush:
1705:
1706: try {
1707:
1708: if (p != null) { // term seen since last flush
1709:
1710: if (docID != p.lastDocID) { // term not yet seen in this doc
1711:
1712: // System.out.println(" seen before (new docID=" + docID + ") freqUpto=" + p.freqUpto +" proxUpto=" + p.proxUpto);
1713:
1714: assert p.docFreq > 0;
1715:
1716: // Now that we know doc freq for previous doc,
1717: // write it & lastDocCode
1718: freqUpto = p.freqUpto & BYTE_BLOCK_MASK;
1719: freq = postingsPool.buffers[p.freqUpto >> BYTE_BLOCK_SHIFT];
1720: if (1 == p.docFreq)
1721: writeFreqVInt(p.lastDocCode | 1);
1722: else {
1723: writeFreqVInt(p.lastDocCode);
1724: writeFreqVInt(p.docFreq);
1725: }
1726: p.freqUpto = freqUpto
1727: + (p.freqUpto & BYTE_BLOCK_NOT_MASK);
1728:
1729: if (doVectors) {
1730: vector = addNewVector();
1731: if (doVectorOffsets) {
1732: offsetStartCode = offsetStart = offset
1733: + token.startOffset();
1734: offsetEnd = offset
1735: + token.endOffset();
1736: }
1737: }
1738:
1739: proxCode = position;
1740:
1741: p.docFreq = 1;
1742:
1743: // Store code so we can write this after we're
1744: // done with this new doc
1745: p.lastDocCode = (docID - p.lastDocID) << 1;
1746: p.lastDocID = docID;
1747:
1748: } else { // term already seen in this doc
1749: // System.out.println(" seen before (same docID=" + docID + ") proxUpto=" + p.proxUpto);
1750: p.docFreq++;
1751:
1752: proxCode = position - p.lastPosition;
1753:
1754: if (doVectors) {
1755: vector = p.vector;
1756: if (vector == null)
1757: vector = addNewVector();
1758: if (doVectorOffsets) {
1759: offsetStart = offset
1760: + token.startOffset();
1761: offsetEnd = offset
1762: + token.endOffset();
1763: offsetStartCode = offsetStart
1764: - vector.lastOffset;
1765: }
1766: }
1767: }
1768: } else { // term not seen before
1769: // System.out.println(" never seen docID=" + docID);
1770:
1771: // Refill?
1772: if (0 == postingsFreeCount) {
1773: getPostings(postingsFreeList);
1774: postingsFreeCount = postingsFreeList.length;
1775: }
1776:
1777: final int textLen1 = 1 + tokenTextLen;
1778: if (textLen1 + charPool.byteUpto > CHAR_BLOCK_SIZE) {
1779: if (textLen1 > CHAR_BLOCK_SIZE) {
1780: // Just skip this term, to remain as robust as
1781: // possible during indexing. A TokenFilter
1782: // can be inserted into the analyzer chain if
1783: // other behavior is wanted (pruning the term
1784: // to a prefix, throwing an exception, etc).
1785: if (maxTermPrefix == null)
1786: maxTermPrefix = new String(
1787: tokenText, 0, 30);
1788:
1789: // Still increment position:
1790: position++;
1791: return;
1792: }
1793: charPool.nextBuffer();
1794: }
1795: final char[] text = charPool.buffer;
1796: final int textUpto = charPool.byteUpto;
1797:
1798: // Pull next free Posting from free list
1799: p = postingsFreeList[--postingsFreeCount];
1800:
1801: p.textStart = textUpto + charPool.byteOffset;
1802: charPool.byteUpto += textLen1;
1803:
1804: System.arraycopy(tokenText, 0, text, textUpto,
1805: tokenTextLen);
1806:
1807: text[textUpto + tokenTextLen] = 0xffff;
1808:
1809: assert postingsHash[hashPos] == null;
1810:
1811: postingsHash[hashPos] = p;
1812: numPostings++;
1813:
1814: if (numPostings == postingsHashHalfSize)
1815: rehashPostings(2 * postingsHashSize);
1816:
1817: // Init first slice for freq & prox streams
1818: final int firstSize = levelSizeArray[0];
1819:
1820: final int upto1 = postingsPool
1821: .newSlice(firstSize);
1822: p.freqStart = p.freqUpto = postingsPool.byteOffset
1823: + upto1;
1824:
1825: final int upto2 = postingsPool
1826: .newSlice(firstSize);
1827: p.proxStart = p.proxUpto = postingsPool.byteOffset
1828: + upto2;
1829:
1830: p.lastDocCode = docID << 1;
1831: p.lastDocID = docID;
1832: p.docFreq = 1;
1833:
1834: if (doVectors) {
1835: vector = addNewVector();
1836: if (doVectorOffsets) {
1837: offsetStart = offsetStartCode = offset
1838: + token.startOffset();
1839: offsetEnd = offset + token.endOffset();
1840: }
1841: }
1842:
1843: proxCode = position;
1844: }
1845:
1846: proxUpto = p.proxUpto & BYTE_BLOCK_MASK;
1847: prox = postingsPool.buffers[p.proxUpto >> BYTE_BLOCK_SHIFT];
1848: assert prox != null;
1849:
1850: if (payload != null && payload.length > 0) {
1851: writeProxVInt((proxCode << 1) | 1);
1852: writeProxVInt(payload.length);
1853: writeProxBytes(payload.data, payload.offset,
1854: payload.length);
1855: fieldInfo.storePayloads = true;
1856: } else
1857: writeProxVInt(proxCode << 1);
1858:
1859: p.proxUpto = proxUpto
1860: + (p.proxUpto & BYTE_BLOCK_NOT_MASK);
1861:
1862: p.lastPosition = position++;
1863:
1864: if (doVectorPositions) {
1865: posUpto = vector.posUpto & BYTE_BLOCK_MASK;
1866: pos = vectorsPool.buffers[vector.posUpto >> BYTE_BLOCK_SHIFT];
1867: writePosVInt(proxCode);
1868: vector.posUpto = posUpto
1869: + (vector.posUpto & BYTE_BLOCK_NOT_MASK);
1870: }
1871:
1872: if (doVectorOffsets) {
1873: offsetUpto = vector.offsetUpto
1874: & BYTE_BLOCK_MASK;
1875: offsets = vectorsPool.buffers[vector.offsetUpto >> BYTE_BLOCK_SHIFT];
1876: writeOffsetVInt(offsetStartCode);
1877: writeOffsetVInt(offsetEnd - offsetStart);
1878: vector.lastOffset = offsetEnd;
1879: vector.offsetUpto = offsetUpto
1880: + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK);
1881: }
1882: } catch (Throwable t) {
1883: throw new AbortException(t, DocumentsWriter.this );
1884: }
1885: }
1886:
1887: /** Called when postings hash is too small (> 50%
1888: * occupied) or too large (< 20% occupied). */
1889: void rehashPostings(final int newSize) {
1890:
1891: final int newMask = newSize - 1;
1892:
1893: Posting[] newHash = new Posting[newSize];
1894: for (int i = 0; i < postingsHashSize; i++) {
1895: Posting p0 = postingsHash[i];
1896: if (p0 != null) {
1897: final int start = p0.textStart
1898: & CHAR_BLOCK_MASK;
1899: final char[] text = charPool.buffers[p0.textStart >> CHAR_BLOCK_SHIFT];
1900: int pos = start;
1901: while (text[pos] != 0xffff)
1902: pos++;
1903: int code = 0;
1904: while (pos > start)
1905: code = (code * 31) + text[--pos];
1906:
1907: int hashPos = code & newMask;
1908: assert hashPos >= 0;
1909: if (newHash[hashPos] != null) {
1910: final int inc = ((code >> 8) + code) | 1;
1911: do {
1912: code += inc;
1913: hashPos = code & newMask;
1914: } while (newHash[hashPos] != null);
1915: }
1916: newHash[hashPos] = p0;
1917: }
1918: }
1919:
1920: postingsHashMask = newMask;
1921: postingsHash = newHash;
1922: postingsHashSize = newSize;
1923: postingsHashHalfSize = newSize >> 1;
1924: }
1925:
1926: final ByteSliceReader vectorSliceReader = new ByteSliceReader();
1927:
1928: /** Called once per field per document if term vectors
1929: * are enabled, to write the vectors to *
1930: * RAMOutputStream, which is then quickly flushed to
1931: * * the real term vectors files in the Directory. */
1932: void writeVectors(FieldInfo fieldInfo) throws IOException {
1933:
1934: assert fieldInfo.storeTermVector;
1935:
1936: vectorFieldNumbers[numVectorFields] = fieldInfo.number;
1937: vectorFieldPointers[numVectorFields] = tvfLocal
1938: .getFilePointer();
1939: numVectorFields++;
1940:
1941: final int numPostingsVectors = postingsVectorsUpto;
1942:
1943: tvfLocal.writeVInt(numPostingsVectors);
1944: byte bits = 0x0;
1945: if (doVectorPositions)
1946: bits |= TermVectorsReader.STORE_POSITIONS_WITH_TERMVECTOR;
1947: if (doVectorOffsets)
1948: bits |= TermVectorsReader.STORE_OFFSET_WITH_TERMVECTOR;
1949: tvfLocal.writeByte(bits);
1950:
1951: doVectorSort(postingsVectors, numPostingsVectors);
1952:
1953: Posting lastPosting = null;
1954:
1955: final ByteSliceReader reader = vectorSliceReader;
1956:
1957: for (int j = 0; j < numPostingsVectors; j++) {
1958: PostingVector vector = postingsVectors[j];
1959: Posting posting = vector.p;
1960: final int freq = posting.docFreq;
1961:
1962: final int prefix;
1963: final char[] text2 = charPool.buffers[posting.textStart >> CHAR_BLOCK_SHIFT];
1964: final int start2 = posting.textStart
1965: & CHAR_BLOCK_MASK;
1966: int pos2 = start2;
1967:
1968: // Compute common prefix between last term and
1969: // this term
1970: if (lastPosting == null)
1971: prefix = 0;
1972: else {
1973: final char[] text1 = charPool.buffers[lastPosting.textStart >> CHAR_BLOCK_SHIFT];
1974: final int start1 = lastPosting.textStart
1975: & CHAR_BLOCK_MASK;
1976: int pos1 = start1;
1977: while (true) {
1978: final char c1 = text1[pos1];
1979: final char c2 = text2[pos2];
1980: if (c1 != c2 || c1 == 0xffff) {
1981: prefix = pos1 - start1;
1982: break;
1983: }
1984: pos1++;
1985: pos2++;
1986: }
1987: }
1988: lastPosting = posting;
1989:
1990: // Compute length
1991: while (text2[pos2] != 0xffff)
1992: pos2++;
1993:
1994: final int suffix = pos2 - start2 - prefix;
1995: tvfLocal.writeVInt(prefix);
1996: tvfLocal.writeVInt(suffix);
1997: tvfLocal.writeChars(text2, start2 + prefix, suffix);
1998: tvfLocal.writeVInt(freq);
1999:
2000: if (doVectorPositions) {
2001: reader.init(vectorsPool, vector.posStart,
2002: vector.posUpto);
2003: reader.writeTo(tvfLocal);
2004: }
2005:
2006: if (doVectorOffsets) {
2007: reader.init(vectorsPool, vector.offsetStart,
2008: vector.offsetUpto);
2009: reader.writeTo(tvfLocal);
2010: }
2011: }
2012: }
2013: }
2014: }
2015:
2016: private static final byte defaultNorm = Similarity.encodeNorm(1.0f);
2017:
2018: /** Write norms in the "true" segment format. This is
2019: * called only during commit, to create the .nrm file. */
2020: void writeNorms(String segmentName, int totalNumDoc)
2021: throws IOException {
2022:
2023: IndexOutput normsOut = directory.createOutput(segmentName + "."
2024: + IndexFileNames.NORMS_EXTENSION);
2025:
2026: try {
2027: normsOut.writeBytes(SegmentMerger.NORMS_HEADER, 0,
2028: SegmentMerger.NORMS_HEADER.length);
2029:
2030: final int numField = fieldInfos.size();
2031:
2032: for (int fieldIdx = 0; fieldIdx < numField; fieldIdx++) {
2033: FieldInfo fi = fieldInfos.fieldInfo(fieldIdx);
2034: if (fi.isIndexed && !fi.omitNorms) {
2035: BufferedNorms n = norms[fieldIdx];
2036: final long v;
2037: if (n == null)
2038: v = 0;
2039: else {
2040: v = n.out.getFilePointer();
2041: n.out.writeTo(normsOut);
2042: n.reset();
2043: }
2044: if (v < totalNumDoc)
2045: fillBytes(normsOut, defaultNorm,
2046: (int) (totalNumDoc - v));
2047: }
2048: }
2049: } finally {
2050: normsOut.close();
2051: }
2052: }
2053:
2054: private DefaultSkipListWriter skipListWriter = null;
2055:
2056: private boolean currentFieldStorePayloads;
2057:
2058: /** Creates a segment from all Postings in the Postings
2059: * hashes across all ThreadStates & FieldDatas. */
2060: private List writeSegment() throws IOException {
2061:
2062: assert allThreadsIdle();
2063:
2064: assert nextDocID == numDocsInRAM;
2065:
2066: final String segmentName;
2067:
2068: segmentName = segment;
2069:
2070: TermInfosWriter termsOut = new TermInfosWriter(directory,
2071: segmentName, fieldInfos, writer.getTermIndexInterval());
2072:
2073: IndexOutput freqOut = directory.createOutput(segmentName
2074: + ".frq");
2075: IndexOutput proxOut = directory.createOutput(segmentName
2076: + ".prx");
2077:
2078: // Gather all FieldData's that have postings, across all
2079: // ThreadStates
2080: ArrayList allFields = new ArrayList();
2081: assert allThreadsIdle();
2082: for (int i = 0; i < threadStates.length; i++) {
2083: ThreadState state = threadStates[i];
2084: state.trimFields();
2085: final int numFields = state.numAllFieldData;
2086: for (int j = 0; j < numFields; j++) {
2087: ThreadState.FieldData fp = state.allFieldDataArray[j];
2088: if (fp.numPostings > 0)
2089: allFields.add(fp);
2090: }
2091: }
2092:
2093: // Sort by field name
2094: Collections.sort(allFields);
2095: final int numAllFields = allFields.size();
2096:
2097: skipListWriter = new DefaultSkipListWriter(
2098: termsOut.skipInterval, termsOut.maxSkipLevels,
2099: numDocsInRAM, freqOut, proxOut);
2100:
2101: int start = 0;
2102: while (start < numAllFields) {
2103:
2104: final String fieldName = ((ThreadState.FieldData) allFields
2105: .get(start)).fieldInfo.name;
2106:
2107: int end = start + 1;
2108: while (end < numAllFields
2109: && ((ThreadState.FieldData) allFields.get(end)).fieldInfo.name
2110: .equals(fieldName))
2111: end++;
2112:
2113: ThreadState.FieldData[] fields = new ThreadState.FieldData[end
2114: - start];
2115: for (int i = start; i < end; i++)
2116: fields[i - start] = (ThreadState.FieldData) allFields
2117: .get(i);
2118:
2119: // If this field has postings then add them to the
2120: // segment
2121: appendPostings(fields, termsOut, freqOut, proxOut);
2122:
2123: for (int i = 0; i < fields.length; i++)
2124: fields[i].resetPostingArrays();
2125:
2126: start = end;
2127: }
2128:
2129: freqOut.close();
2130: proxOut.close();
2131: termsOut.close();
2132:
2133: // Record all files we have flushed
2134: List flushedFiles = new ArrayList();
2135: flushedFiles
2136: .add(segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION));
2137: flushedFiles
2138: .add(segmentFileName(IndexFileNames.FREQ_EXTENSION));
2139: flushedFiles
2140: .add(segmentFileName(IndexFileNames.PROX_EXTENSION));
2141: flushedFiles
2142: .add(segmentFileName(IndexFileNames.TERMS_EXTENSION));
2143: flushedFiles
2144: .add(segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION));
2145:
2146: if (hasNorms) {
2147: writeNorms(segmentName, numDocsInRAM);
2148: flushedFiles
2149: .add(segmentFileName(IndexFileNames.NORMS_EXTENSION));
2150: }
2151:
2152: if (infoStream != null) {
2153: final long newSegmentSize = segmentSize(segmentName);
2154: String message = " oldRAMSize="
2155: + numBytesUsed
2156: + " newFlushedSize="
2157: + newSegmentSize
2158: + " docs/MB="
2159: + nf.format(numDocsInRAM
2160: / (newSegmentSize / 1024. / 1024.))
2161: + " new/old="
2162: + nf.format(100.0 * newSegmentSize / numBytesUsed)
2163: + "%";
2164: infoStream.println(message);
2165: }
2166:
2167: resetPostingsData();
2168:
2169: nextDocID = 0;
2170: nextWriteDocID = 0;
2171: numDocsInRAM = 0;
2172: files = null;
2173:
2174: // Maybe downsize postingsFreeList array
2175: if (postingsFreeList.length > 1.5 * postingsFreeCount) {
2176: int newSize = postingsFreeList.length;
2177: while (newSize > 1.25 * postingsFreeCount) {
2178: newSize = (int) (newSize * 0.8);
2179: }
2180: Posting[] newArray = new Posting[newSize];
2181: System.arraycopy(postingsFreeList, 0, newArray, 0,
2182: postingsFreeCount);
2183: postingsFreeList = newArray;
2184: }
2185:
2186: return flushedFiles;
2187: }
2188:
2189: /** Returns the name of the file with this extension, on
2190: * the current segment we are working on. */
2191: private String segmentFileName(String extension) {
2192: return segment + "." + extension;
2193: }
2194:
2195: private final TermInfo termInfo = new TermInfo(); // minimize consing
2196:
2197: /** Used to merge the postings from multiple ThreadStates
2198: * when creating a segment */
2199: final static class FieldMergeState {
2200:
2201: private ThreadState.FieldData field;
2202:
2203: private Posting[] postings;
2204:
2205: private Posting p;
2206: private char[] text;
2207: private int textOffset;
2208:
2209: private int postingUpto = -1;
2210:
2211: private ByteSliceReader freq = new ByteSliceReader();
2212: private ByteSliceReader prox = new ByteSliceReader();
2213:
2214: private int docID;
2215: private int termFreq;
2216:
2217: boolean nextTerm() throws IOException {
2218: postingUpto++;
2219: if (postingUpto == field.numPostings)
2220: return false;
2221:
2222: p = postings[postingUpto];
2223: docID = 0;
2224:
2225: text = field.threadState.charPool.buffers[p.textStart >> CHAR_BLOCK_SHIFT];
2226: textOffset = p.textStart & CHAR_BLOCK_MASK;
2227:
2228: if (p.freqUpto > p.freqStart)
2229: freq.init(field.threadState.postingsPool, p.freqStart,
2230: p.freqUpto);
2231: else
2232: freq.bufferOffset = freq.upto = freq.endIndex = 0;
2233:
2234: prox.init(field.threadState.postingsPool, p.proxStart,
2235: p.proxUpto);
2236:
2237: // Should always be true
2238: boolean result = nextDoc();
2239: assert result;
2240:
2241: return true;
2242: }
2243:
2244: public boolean nextDoc() throws IOException {
2245: if (freq.bufferOffset + freq.upto == freq.endIndex) {
2246: if (p.lastDocCode != -1) {
2247: // Return last doc
2248: docID = p.lastDocID;
2249: termFreq = p.docFreq;
2250: p.lastDocCode = -1;
2251: return true;
2252: } else
2253: // EOF
2254: return false;
2255: }
2256:
2257: final int code = freq.readVInt();
2258: docID += code >>> 1;
2259: if ((code & 1) != 0)
2260: termFreq = 1;
2261: else
2262: termFreq = freq.readVInt();
2263:
2264: return true;
2265: }
2266: }
2267:
2268: int compareText(final char[] text1, int pos1, final char[] text2,
2269: int pos2) {
2270: while (true) {
2271: final char c1 = text1[pos1++];
2272: final char c2 = text2[pos2++];
2273: if (c1 < c2)
2274: if (0xffff == c2)
2275: return 1;
2276: else
2277: return -1;
2278: else if (c2 < c1)
2279: if (0xffff == c1)
2280: return -1;
2281: else
2282: return 1;
2283: else if (0xffff == c1)
2284: return 0;
2285: }
2286: }
2287:
2288: /* Walk through all unique text tokens (Posting
2289: * instances) found in this field and serialize them
2290: * into a single RAM segment. */
2291: void appendPostings(ThreadState.FieldData[] fields,
2292: TermInfosWriter termsOut, IndexOutput freqOut,
2293: IndexOutput proxOut) throws CorruptIndexException,
2294: IOException {
2295:
2296: final int fieldNumber = fields[0].fieldInfo.number;
2297: int numFields = fields.length;
2298:
2299: final FieldMergeState[] mergeStates = new FieldMergeState[numFields];
2300:
2301: for (int i = 0; i < numFields; i++) {
2302: FieldMergeState fms = mergeStates[i] = new FieldMergeState();
2303: fms.field = fields[i];
2304: fms.postings = fms.field.sortPostings();
2305:
2306: assert fms.field.fieldInfo == fields[0].fieldInfo;
2307:
2308: // Should always be true
2309: boolean result = fms.nextTerm();
2310: assert result;
2311: }
2312:
2313: final int skipInterval = termsOut.skipInterval;
2314: currentFieldStorePayloads = fields[0].fieldInfo.storePayloads;
2315:
2316: FieldMergeState[] termStates = new FieldMergeState[numFields];
2317:
2318: while (numFields > 0) {
2319:
2320: // Get the next term to merge
2321: termStates[0] = mergeStates[0];
2322: int numToMerge = 1;
2323:
2324: for (int i = 1; i < numFields; i++) {
2325: final char[] text = mergeStates[i].text;
2326: final int textOffset = mergeStates[i].textOffset;
2327: final int cmp = compareText(text, textOffset,
2328: termStates[0].text, termStates[0].textOffset);
2329:
2330: if (cmp < 0) {
2331: termStates[0] = mergeStates[i];
2332: numToMerge = 1;
2333: } else if (cmp == 0)
2334: termStates[numToMerge++] = mergeStates[i];
2335: }
2336:
2337: int df = 0;
2338: int lastPayloadLength = -1;
2339:
2340: int lastDoc = 0;
2341:
2342: final char[] text = termStates[0].text;
2343: final int start = termStates[0].textOffset;
2344: int pos = start;
2345: while (text[pos] != 0xffff)
2346: pos++;
2347:
2348: long freqPointer = freqOut.getFilePointer();
2349: long proxPointer = proxOut.getFilePointer();
2350:
2351: skipListWriter.resetSkip();
2352:
2353: // Now termStates has numToMerge FieldMergeStates
2354: // which all share the same term. Now we must
2355: // interleave the docID streams.
2356: while (numToMerge > 0) {
2357:
2358: if ((++df % skipInterval) == 0) {
2359: skipListWriter.setSkipData(lastDoc,
2360: currentFieldStorePayloads,
2361: lastPayloadLength);
2362: skipListWriter.bufferSkip(df);
2363: }
2364:
2365: FieldMergeState minState = termStates[0];
2366: for (int i = 1; i < numToMerge; i++)
2367: if (termStates[i].docID < minState.docID)
2368: minState = termStates[i];
2369:
2370: final int doc = minState.docID;
2371: final int termDocFreq = minState.termFreq;
2372:
2373: assert doc < numDocsInRAM;
2374: assert doc > lastDoc || df == 1;
2375:
2376: final int newDocCode = (doc - lastDoc) << 1;
2377: lastDoc = doc;
2378:
2379: final ByteSliceReader prox = minState.prox;
2380:
2381: // Carefully copy over the prox + payload info,
2382: // changing the format to match Lucene's segment
2383: // format.
2384: for (int j = 0; j < termDocFreq; j++) {
2385: final int code = prox.readVInt();
2386: if (currentFieldStorePayloads) {
2387: final int payloadLength;
2388: if ((code & 1) != 0) {
2389: // This position has a payload
2390: payloadLength = prox.readVInt();
2391: } else
2392: payloadLength = 0;
2393: if (payloadLength != lastPayloadLength) {
2394: proxOut.writeVInt(code | 1);
2395: proxOut.writeVInt(payloadLength);
2396: lastPayloadLength = payloadLength;
2397: } else
2398: proxOut.writeVInt(code & (~1));
2399: if (payloadLength > 0)
2400: copyBytes(prox, proxOut, payloadLength);
2401: } else {
2402: assert 0 == (code & 1);
2403: proxOut.writeVInt(code >> 1);
2404: }
2405: }
2406:
2407: if (1 == termDocFreq) {
2408: freqOut.writeVInt(newDocCode | 1);
2409: } else {
2410: freqOut.writeVInt(newDocCode);
2411: freqOut.writeVInt(termDocFreq);
2412: }
2413:
2414: if (!minState.nextDoc()) {
2415:
2416: // Remove from termStates
2417: int upto = 0;
2418: for (int i = 0; i < numToMerge; i++)
2419: if (termStates[i] != minState)
2420: termStates[upto++] = termStates[i];
2421: numToMerge--;
2422: assert upto == numToMerge;
2423:
2424: // Advance this state to the next term
2425:
2426: if (!minState.nextTerm()) {
2427: // OK, no more terms, so remove from mergeStates
2428: // as well
2429: upto = 0;
2430: for (int i = 0; i < numFields; i++)
2431: if (mergeStates[i] != minState)
2432: mergeStates[upto++] = mergeStates[i];
2433: numFields--;
2434: assert upto == numFields;
2435: }
2436: }
2437: }
2438:
2439: assert df > 0;
2440:
2441: // Done merging this term
2442:
2443: long skipPointer = skipListWriter.writeSkip(freqOut);
2444:
2445: // Write term
2446: termInfo.set(df, freqPointer, proxPointer,
2447: (int) (skipPointer - freqPointer));
2448: termsOut.add(fieldNumber, text, start, pos - start,
2449: termInfo);
2450: }
2451: }
2452:
2453: synchronized void close() {
2454: closed = true;
2455: notifyAll();
2456: }
2457:
2458: /** Returns a free (idle) ThreadState that may be used for
2459: * indexing this one document. This call also pauses if a
2460: * flush is pending. If delTerm is non-null then we
2461: * buffer this deleted term after the thread state has
2462: * been acquired. */
2463: synchronized ThreadState getThreadState(Document doc, Term delTerm)
2464: throws IOException {
2465:
2466: // First, find a thread state. If this thread already
2467: // has affinity to a specific ThreadState, use that one
2468: // again.
2469: ThreadState state = (ThreadState) threadBindings.get(Thread
2470: .currentThread());
2471: if (state == null) {
2472: // First time this thread has called us since last flush
2473: ThreadState minThreadState = null;
2474: for (int i = 0; i < threadStates.length; i++) {
2475: ThreadState ts = threadStates[i];
2476: if (minThreadState == null
2477: || ts.numThreads < minThreadState.numThreads)
2478: minThreadState = ts;
2479: }
2480: if (minThreadState != null
2481: && (minThreadState.numThreads == 0 || threadStates.length == MAX_THREAD_STATE)) {
2482: state = minThreadState;
2483: state.numThreads++;
2484: } else {
2485: // Just create a new "private" thread state
2486: ThreadState[] newArray = new ThreadState[1 + threadStates.length];
2487: if (threadStates.length > 0)
2488: System.arraycopy(threadStates, 0, newArray, 0,
2489: threadStates.length);
2490: state = newArray[threadStates.length] = new ThreadState();
2491: threadStates = newArray;
2492: }
2493: threadBindings.put(Thread.currentThread(), state);
2494: }
2495:
2496: // Next, wait until my thread state is idle (in case
2497: // it's shared with other threads) and for threads to
2498: // not be paused nor a flush pending:
2499: while (!closed
2500: && (!state.isIdle || pauseThreads != 0 || flushPending || abortCount > 0))
2501: try {
2502: wait();
2503: } catch (InterruptedException e) {
2504: Thread.currentThread().interrupt();
2505: }
2506:
2507: if (closed)
2508: throw new AlreadyClosedException(
2509: "this IndexWriter is closed");
2510:
2511: if (segment == null)
2512: segment = writer.newSegmentName();
2513:
2514: numDocsInRAM++;
2515:
2516: // We must at this point commit to flushing to ensure we
2517: // always get N docs when we flush by doc count, even if
2518: // > 1 thread is adding documents:
2519: if (!flushPending
2520: && maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH
2521: && numDocsInRAM >= maxBufferedDocs) {
2522: flushPending = true;
2523: state.doFlushAfter = true;
2524: } else
2525: state.doFlushAfter = false;
2526:
2527: state.isIdle = false;
2528:
2529: try {
2530: boolean success = false;
2531: try {
2532: state.init(doc, nextDocID);
2533: if (delTerm != null) {
2534: addDeleteTerm(delTerm, state.docID);
2535: if (!state.doFlushAfter)
2536: state.doFlushAfter = timeToFlushDeletes();
2537: }
2538: // Only increment nextDocID on successful init
2539: nextDocID++;
2540: success = true;
2541: } finally {
2542: if (!success) {
2543: // Forcefully idle this ThreadState:
2544: state.isIdle = true;
2545: notifyAll();
2546: if (state.doFlushAfter) {
2547: state.doFlushAfter = false;
2548: flushPending = false;
2549: }
2550: }
2551: }
2552: } catch (AbortException ae) {
2553: abort(ae);
2554: }
2555:
2556: return state;
2557: }
2558:
2559: /** Returns true if the caller (IndexWriter) should now
2560: * flush. */
2561: boolean addDocument(Document doc, Analyzer analyzer)
2562: throws CorruptIndexException, IOException {
2563: return updateDocument(doc, analyzer, null);
2564: }
2565:
2566: boolean updateDocument(Term t, Document doc, Analyzer analyzer)
2567: throws CorruptIndexException, IOException {
2568: return updateDocument(doc, analyzer, t);
2569: }
2570:
2571: boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
2572: throws CorruptIndexException, IOException {
2573:
2574: // This call is synchronized but fast
2575: final ThreadState state = getThreadState(doc, delTerm);
2576: try {
2577: boolean success = false;
2578: try {
2579: try {
2580: // This call is not synchronized and does all the work
2581: state.processDocument(analyzer);
2582: } finally {
2583: // This call is synchronized but fast
2584: finishDocument(state);
2585: }
2586: success = true;
2587: } finally {
2588: if (!success) {
2589: synchronized (this ) {
2590: // Immediately mark this document as deleted
2591: // since likely it was partially added. This
2592: // keeps indexing as "all or none" (atomic) when
2593: // adding a document:
2594: addDeleteDocID(state.docID);
2595: }
2596: }
2597: }
2598: } catch (AbortException ae) {
2599: abort(ae);
2600: }
2601:
2602: return state.doFlushAfter || timeToFlushDeletes();
2603: }
2604:
2605: synchronized int getNumBufferedDeleteTerms() {
2606: return numBufferedDeleteTerms;
2607: }
2608:
2609: synchronized HashMap getBufferedDeleteTerms() {
2610: return bufferedDeleteTerms;
2611: }
2612:
2613: synchronized List getBufferedDeleteDocIDs() {
2614: return bufferedDeleteDocIDs;
2615: }
2616:
2617: // Reset buffered deletes.
2618: synchronized void clearBufferedDeletes() throws IOException {
2619: bufferedDeleteTerms.clear();
2620: bufferedDeleteDocIDs.clear();
2621: numBufferedDeleteTerms = 0;
2622: if (numBytesUsed > 0)
2623: resetPostingsData();
2624: }
2625:
2626: synchronized boolean bufferDeleteTerms(Term[] terms)
2627: throws IOException {
2628: while (pauseThreads != 0 || flushPending)
2629: try {
2630: wait();
2631: } catch (InterruptedException e) {
2632: Thread.currentThread().interrupt();
2633: }
2634: for (int i = 0; i < terms.length; i++)
2635: addDeleteTerm(terms[i], numDocsInRAM);
2636: return timeToFlushDeletes();
2637: }
2638:
2639: synchronized boolean bufferDeleteTerm(Term term) throws IOException {
2640: while (pauseThreads != 0 || flushPending)
2641: try {
2642: wait();
2643: } catch (InterruptedException e) {
2644: Thread.currentThread().interrupt();
2645: }
2646: addDeleteTerm(term, numDocsInRAM);
2647: return timeToFlushDeletes();
2648: }
2649:
2650: synchronized private boolean timeToFlushDeletes() {
2651: return (bufferIsFull || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && numBufferedDeleteTerms >= maxBufferedDeleteTerms))
2652: && setFlushPending();
2653: }
2654:
2655: void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
2656: this .maxBufferedDeleteTerms = maxBufferedDeleteTerms;
2657: }
2658:
2659: int getMaxBufferedDeleteTerms() {
2660: return maxBufferedDeleteTerms;
2661: }
2662:
2663: synchronized boolean hasDeletes() {
2664: return bufferedDeleteTerms.size() > 0
2665: || bufferedDeleteDocIDs.size() > 0;
2666: }
2667:
2668: // Number of documents a delete term applies to.
2669: static class Num {
2670: private int num;
2671:
2672: Num(int num) {
2673: this .num = num;
2674: }
2675:
2676: int getNum() {
2677: return num;
2678: }
2679:
2680: void setNum(int num) {
2681: // Only record the new number if it's greater than the
2682: // current one. This is important because if multiple
2683: // threads are replacing the same doc at nearly the
2684: // same time, it's possible that one thread that got a
2685: // higher docID is scheduled before the other
2686: // threads.
2687: if (num > this .num)
2688: this .num = num;
2689: }
2690: }
2691:
2692: // Buffer a term in bufferedDeleteTerms, which records the
2693: // current number of documents buffered in ram so that the
2694: // delete term will be applied to those documents as well
2695: // as the disk segments.
2696: synchronized private void addDeleteTerm(Term term, int docCount) {
2697: Num num = (Num) bufferedDeleteTerms.get(term);
2698: if (num == null) {
2699: bufferedDeleteTerms.put(term, new Num(docCount));
2700: // This is coarse approximation of actual bytes used:
2701: numBytesUsed += (term.field().length() + term.text()
2702: .length())
2703: * BYTES_PER_CHAR
2704: + 4
2705: + 5
2706: * OBJECT_HEADER_BYTES
2707: + 5
2708: * OBJECT_POINTER_BYTES;
2709: if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH
2710: && numBytesUsed > ramBufferSize) {
2711: bufferIsFull = true;
2712: }
2713: } else {
2714: num.setNum(docCount);
2715: }
2716: numBufferedDeleteTerms++;
2717: }
2718:
2719: // Buffer a specific docID for deletion. Currently only
2720: // used when we hit a exception when adding a document
2721: synchronized private void addDeleteDocID(int docId) {
2722: bufferedDeleteDocIDs.add(new Integer(docId));
2723: numBytesUsed += OBJECT_HEADER_BYTES + BYTES_PER_INT
2724: + OBJECT_POINTER_BYTES;
2725: }
2726:
2727: /** Does the synchronized work to finish/flush the
2728: * inverted document. */
2729: private synchronized void finishDocument(ThreadState state)
2730: throws IOException, AbortException {
2731: if (abortCount > 0) {
2732: // Forcefully idle this threadstate -- its state will
2733: // be reset by abort()
2734: state.isIdle = true;
2735: notifyAll();
2736: return;
2737: }
2738:
2739: // Now write the indexed document to the real files.
2740: if (nextWriteDocID == state.docID) {
2741: // It's my turn, so write everything now:
2742: nextWriteDocID++;
2743: state.writeDocument();
2744: state.isIdle = true;
2745: notifyAll();
2746:
2747: // If any states were waiting on me, sweep through and
2748: // flush those that are enabled by my write.
2749: if (numWaiting > 0) {
2750: boolean any = true;
2751: while (any) {
2752: any = false;
2753: for (int i = 0; i < numWaiting;) {
2754: final ThreadState s = waitingThreadStates[i];
2755: if (s.docID == nextWriteDocID) {
2756: s.writeDocument();
2757: s.isIdle = true;
2758: nextWriteDocID++;
2759: any = true;
2760: if (numWaiting > i + 1)
2761: // Swap in the last waiting state to fill in
2762: // the hole we just created. It's important
2763: // to do this as-we-go and not at the end of
2764: // the loop, because if we hit an aborting
2765: // exception in one of the s.writeDocument
2766: // calls (above), it leaves this array in an
2767: // inconsistent state:
2768: waitingThreadStates[i] = waitingThreadStates[numWaiting - 1];
2769: numWaiting--;
2770: } else {
2771: assert !s.isIdle;
2772: i++;
2773: }
2774: }
2775: }
2776: }
2777: } else {
2778: // Another thread got a docID before me, but, it
2779: // hasn't finished its processing. So add myself to
2780: // the line but don't hold up this thread.
2781: waitingThreadStates[numWaiting++] = state;
2782: }
2783: }
2784:
2785: long getRAMUsed() {
2786: return numBytesUsed;
2787: }
2788:
2789: long numBytesAlloc;
2790: long numBytesUsed;
2791:
2792: NumberFormat nf = NumberFormat.getInstance();
2793:
2794: /* Used only when writing norms to fill in default norm
2795: * value into the holes in docID stream for those docs
2796: * that didn't have this field. */
2797: static void fillBytes(IndexOutput out, byte b, int numBytes)
2798: throws IOException {
2799: for (int i = 0; i < numBytes; i++)
2800: out.writeByte(b);
2801: }
2802:
2803: byte[] copyByteBuffer = new byte[4096];
2804:
2805: /** Copy numBytes from srcIn to destIn */
2806: void copyBytes(IndexInput srcIn, IndexOutput destIn, long numBytes)
2807: throws IOException {
2808: // TODO: we could do this more efficiently (save a copy)
2809: // because it's always from a ByteSliceReader ->
2810: // IndexOutput
2811: while (numBytes > 0) {
2812: final int chunk;
2813: if (numBytes > 4096)
2814: chunk = 4096;
2815: else
2816: chunk = (int) numBytes;
2817: srcIn.readBytes(copyByteBuffer, 0, chunk);
2818: destIn.writeBytes(copyByteBuffer, 0, chunk);
2819: numBytes -= chunk;
2820: }
2821: }
2822:
2823: /* Stores norms, buffered in RAM, until they are flushed
2824: * to a partial segment. */
2825: private static class BufferedNorms {
2826:
2827: RAMOutputStream out;
2828: int upto;
2829:
2830: BufferedNorms() {
2831: out = new RAMOutputStream();
2832: }
2833:
2834: void add(float norm) throws IOException {
2835: byte b = Similarity.encodeNorm(norm);
2836: out.writeByte(b);
2837: upto++;
2838: }
2839:
2840: void reset() {
2841: out.reset();
2842: upto = 0;
2843: }
2844:
2845: void fill(int docID) throws IOException {
2846: // Must now fill in docs that didn't have this
2847: // field. Note that this is how norms can consume
2848: // tremendous storage when the docs have widely
2849: // varying different fields, because we are not
2850: // storing the norms sparsely (see LUCENE-830)
2851: if (upto < docID) {
2852: fillBytes(out, defaultNorm, docID - upto);
2853: upto = docID;
2854: }
2855: }
2856: }
2857:
2858: /* Simple StringReader that can be reset to a new string;
2859: * we use this when tokenizing the string value from a
2860: * Field. */
2861: private final static class ReusableStringReader extends Reader {
2862: int upto;
2863: int left;
2864: String s;
2865:
2866: void init(String s) {
2867: this .s = s;
2868: left = s.length();
2869: this .upto = 0;
2870: }
2871:
2872: public int read(char[] c) {
2873: return read(c, 0, c.length);
2874: }
2875:
2876: public int read(char[] c, int off, int len) {
2877: if (left > len) {
2878: s.getChars(upto, upto + len, c, off);
2879: upto += len;
2880: left -= len;
2881: return len;
2882: } else if (0 == left) {
2883: return -1;
2884: } else {
2885: s.getChars(upto, upto + left, c, off);
2886: int r = left;
2887: left = 0;
2888: upto = s.length();
2889: return r;
2890: }
2891: }
2892:
2893: public void close() {
2894: };
2895: }
2896:
2897: /* IndexInput that knows how to read the byte slices written
2898: * by Posting and PostingVector. We read the bytes in
2899: * each slice until we hit the end of that slice at which
2900: * point we read the forwarding address of the next slice
2901: * and then jump to it.*/
2902: private final static class ByteSliceReader extends IndexInput {
2903: ByteBlockPool pool;
2904: int bufferUpto;
2905: byte[] buffer;
2906: public int upto;
2907: int limit;
2908: int level;
2909: public int bufferOffset;
2910:
2911: public int endIndex;
2912:
2913: public void init(ByteBlockPool pool, int startIndex,
2914: int endIndex) {
2915:
2916: assert endIndex - startIndex > 0;
2917:
2918: this .pool = pool;
2919: this .endIndex = endIndex;
2920:
2921: level = 0;
2922: bufferUpto = startIndex / BYTE_BLOCK_SIZE;
2923: bufferOffset = bufferUpto * BYTE_BLOCK_SIZE;
2924: buffer = pool.buffers[bufferUpto];
2925: upto = startIndex & BYTE_BLOCK_MASK;
2926:
2927: final int firstSize = levelSizeArray[0];
2928:
2929: if (startIndex + firstSize >= endIndex) {
2930: // There is only this one slice to read
2931: limit = endIndex & BYTE_BLOCK_MASK;
2932: } else
2933: limit = upto + firstSize - 4;
2934: }
2935:
2936: public byte readByte() {
2937: // Assert that we are not @ EOF
2938: assert upto + bufferOffset < endIndex;
2939: if (upto == limit)
2940: nextSlice();
2941: return buffer[upto++];
2942: }
2943:
2944: public long writeTo(IndexOutput out) throws IOException {
2945: long size = 0;
2946: while (true) {
2947: if (limit + bufferOffset == endIndex) {
2948: assert endIndex - bufferOffset >= upto;
2949: out.writeBytes(buffer, upto, limit - upto);
2950: size += limit - upto;
2951: break;
2952: } else {
2953: out.writeBytes(buffer, upto, limit - upto);
2954: size += limit - upto;
2955: nextSlice();
2956: }
2957: }
2958:
2959: return size;
2960: }
2961:
2962: public void nextSlice() {
2963:
2964: // Skip to our next slice
2965: final int nextIndex = ((buffer[limit] & 0xff) << 24)
2966: + ((buffer[1 + limit] & 0xff) << 16)
2967: + ((buffer[2 + limit] & 0xff) << 8)
2968: + (buffer[3 + limit] & 0xff);
2969:
2970: level = nextLevelArray[level];
2971: final int newSize = levelSizeArray[level];
2972:
2973: bufferUpto = nextIndex / BYTE_BLOCK_SIZE;
2974: bufferOffset = bufferUpto * BYTE_BLOCK_SIZE;
2975:
2976: buffer = pool.buffers[bufferUpto];
2977: upto = nextIndex & BYTE_BLOCK_MASK;
2978:
2979: if (nextIndex + newSize >= endIndex) {
2980: // We are advancing to the final slice
2981: assert endIndex - nextIndex > 0;
2982: limit = endIndex - bufferOffset;
2983: } else {
2984: // This is not the final slice (subtract 4 for the
2985: // forwarding address at the end of this new slice)
2986: limit = upto + newSize - 4;
2987: }
2988: }
2989:
2990: public void readBytes(byte[] b, int offset, int len) {
2991: while (len > 0) {
2992: final int numLeft = limit - upto;
2993: if (numLeft < len) {
2994: // Read entire slice
2995: System.arraycopy(buffer, upto, b, offset, numLeft);
2996: offset += numLeft;
2997: len -= numLeft;
2998: nextSlice();
2999: } else {
3000: // This slice is the last one
3001: System.arraycopy(buffer, upto, b, offset, len);
3002: upto += len;
3003: break;
3004: }
3005: }
3006: }
3007:
3008: public long getFilePointer() {
3009: throw new RuntimeException("not implemented");
3010: }
3011:
3012: public long length() {
3013: throw new RuntimeException("not implemented");
3014: }
3015:
3016: public void seek(long pos) {
3017: throw new RuntimeException("not implemented");
3018: }
3019:
3020: public void close() {
3021: throw new RuntimeException("not implemented");
3022: }
3023: }
3024:
3025: // Size of each slice. These arrays should be at most 16
3026: // elements. First array is just a compact way to encode
3027: // X+1 with a max. Second array is the length of each
3028: // slice, ie first slice is 5 bytes, next slice is 14
3029: // bytes, etc.
3030: final static int[] nextLevelArray = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 9 };
3031: final static int[] levelSizeArray = { 5, 14, 20, 30, 40, 40, 80,
3032: 80, 120, 200 };
3033:
3034: /* Class that Posting and PostingVector use to write byte
3035: * streams into shared fixed-size byte[] arrays. The idea
3036: * is to allocate slices of increasing lengths For
3037: * example, the first slice is 5 bytes, the next slice is
3038: * 14, etc. We start by writing our bytes into the first
3039: * 5 bytes. When we hit the end of the slice, we allocate
3040: * the next slice and then write the address of the new
3041: * slice into the last 4 bytes of the previous slice (the
3042: * "forwarding address").
3043: *
3044: * Each slice is filled with 0's initially, and we mark
3045: * the end with a non-zero byte. This way the methods
3046: * that are writing into the slice don't need to record
3047: * its length and instead allocate a new slice once they
3048: * hit a non-zero byte. */
3049: private final class ByteBlockPool {
3050:
3051: public byte[][] buffers = new byte[10][];
3052:
3053: int bufferUpto = -1; // Which buffer we are upto
3054: public int byteUpto = BYTE_BLOCK_SIZE; // Where we are in head buffer
3055:
3056: public byte[] buffer; // Current head buffer
3057: public int byteOffset = -BYTE_BLOCK_SIZE; // Current head offset
3058:
3059: public void reset() {
3060: if (bufferUpto != -1) {
3061: // We allocated at least one buffer
3062:
3063: for (int i = 0; i < bufferUpto; i++)
3064: // Fully zero fill buffers that we fully used
3065: Arrays.fill(buffers[i], (byte) 0);
3066:
3067: // Partial zero fill the final buffer
3068: Arrays.fill(buffers[bufferUpto], 0, byteUpto, (byte) 0);
3069:
3070: if (bufferUpto > 0)
3071: // Recycle all but the first buffer
3072: recycleByteBlocks(buffers, 1, 1 + bufferUpto);
3073:
3074: // Re-use the first buffer
3075: bufferUpto = 0;
3076: byteUpto = 0;
3077: byteOffset = 0;
3078: buffer = buffers[0];
3079: }
3080: }
3081:
3082: public void nextBuffer() {
3083: if (1 + bufferUpto == buffers.length) {
3084: byte[][] newBuffers = new byte[(int) (buffers.length * 1.5)][];
3085: System.arraycopy(buffers, 0, newBuffers, 0,
3086: buffers.length);
3087: buffers = newBuffers;
3088: }
3089: buffer = buffers[1 + bufferUpto] = getByteBlock();
3090: bufferUpto++;
3091:
3092: byteUpto = 0;
3093: byteOffset += BYTE_BLOCK_SIZE;
3094: }
3095:
3096: public int newSlice(final int size) {
3097: if (byteUpto > BYTE_BLOCK_SIZE - size)
3098: nextBuffer();
3099: final int upto = byteUpto;
3100: byteUpto += size;
3101: buffer[byteUpto - 1] = 16;
3102: return upto;
3103: }
3104:
3105: public int allocSlice(final byte[] slice, final int upto) {
3106:
3107: final int level = slice[upto] & 15;
3108: final int newLevel = nextLevelArray[level];
3109: final int newSize = levelSizeArray[newLevel];
3110:
3111: // Maybe allocate another block
3112: if (byteUpto > BYTE_BLOCK_SIZE - newSize)
3113: nextBuffer();
3114:
3115: final int newUpto = byteUpto;
3116: final int offset = newUpto + byteOffset;
3117: byteUpto += newSize;
3118:
3119: // Copy forward the past 3 bytes (which we are about
3120: // to overwrite with the forwarding address):
3121: buffer[newUpto] = slice[upto - 3];
3122: buffer[newUpto + 1] = slice[upto - 2];
3123: buffer[newUpto + 2] = slice[upto - 1];
3124:
3125: // Write forwarding address at end of last slice:
3126: slice[upto - 3] = (byte) (offset >>> 24);
3127: slice[upto - 2] = (byte) (offset >>> 16);
3128: slice[upto - 1] = (byte) (offset >>> 8);
3129: slice[upto] = (byte) offset;
3130:
3131: // Write new level:
3132: buffer[byteUpto - 1] = (byte) (16 | newLevel);
3133:
3134: return newUpto + 3;
3135: }
3136: }
3137:
3138: private final class CharBlockPool {
3139:
3140: public char[][] buffers = new char[10][];
3141: int numBuffer;
3142:
3143: int bufferUpto = -1; // Which buffer we are upto
3144: public int byteUpto = CHAR_BLOCK_SIZE; // Where we are in head buffer
3145:
3146: public char[] buffer; // Current head buffer
3147: public int byteOffset = -CHAR_BLOCK_SIZE; // Current head offset
3148:
3149: public void reset() {
3150: recycleCharBlocks(buffers, 1 + bufferUpto);
3151: bufferUpto = -1;
3152: byteUpto = CHAR_BLOCK_SIZE;
3153: byteOffset = -CHAR_BLOCK_SIZE;
3154: }
3155:
3156: public void nextBuffer() {
3157: if (1 + bufferUpto == buffers.length) {
3158: char[][] newBuffers = new char[(int) (buffers.length * 1.5)][];
3159: System.arraycopy(buffers, 0, newBuffers, 0,
3160: buffers.length);
3161: buffers = newBuffers;
3162: }
3163: buffer = buffers[1 + bufferUpto] = getCharBlock();
3164: bufferUpto++;
3165:
3166: byteUpto = 0;
3167: byteOffset += CHAR_BLOCK_SIZE;
3168: }
3169: }
3170:
3171: // Used only when infoStream != null
3172: private long segmentSize(String segmentName) throws IOException {
3173: assert infoStream != null;
3174:
3175: long size = directory.fileLength(segmentName + ".tii")
3176: + directory.fileLength(segmentName + ".tis")
3177: + directory.fileLength(segmentName + ".frq")
3178: + directory.fileLength(segmentName + ".prx");
3179:
3180: final String normFileName = segmentName + ".nrm";
3181: if (directory.fileExists(normFileName))
3182: size += directory.fileLength(normFileName);
3183:
3184: return size;
3185: }
3186:
3187: final private static int POINTER_NUM_BYTE = 4;
3188: final private static int INT_NUM_BYTE = 4;
3189: final private static int CHAR_NUM_BYTE = 2;
3190:
3191: // Why + 5*POINTER_NUM_BYTE below?
3192: // 1: Posting has "vector" field which is a pointer
3193: // 2: Posting is referenced by postingsFreeList array
3194: // 3,4,5: Posting is referenced by postings hash, which
3195: // targets 25-50% fill factor; approximate this
3196: // as 3X # pointers
3197: final static int POSTING_NUM_BYTE = OBJECT_HEADER_BYTES + 9
3198: * INT_NUM_BYTE + 5 * POINTER_NUM_BYTE;
3199:
3200: // Holds free pool of Posting instances
3201: private Posting[] postingsFreeList;
3202: private int postingsFreeCount;
3203: private int postingsAllocCount;
3204:
3205: /* Allocate more Postings from shared pool */
3206: synchronized void getPostings(Posting[] postings) {
3207: numBytesUsed += postings.length * POSTING_NUM_BYTE;
3208: final int numToCopy;
3209: if (postingsFreeCount < postings.length)
3210: numToCopy = postingsFreeCount;
3211: else
3212: numToCopy = postings.length;
3213: final int start = postingsFreeCount - numToCopy;
3214: System.arraycopy(postingsFreeList, start, postings, 0,
3215: numToCopy);
3216: postingsFreeCount -= numToCopy;
3217:
3218: // Directly allocate the remainder if any
3219: if (numToCopy < postings.length) {
3220: final int extra = postings.length - numToCopy;
3221: final int newPostingsAllocCount = postingsAllocCount
3222: + extra;
3223: if (newPostingsAllocCount > postingsFreeList.length)
3224: postingsFreeList = new Posting[(int) (1.25 * newPostingsAllocCount)];
3225:
3226: balanceRAM();
3227: for (int i = numToCopy; i < postings.length; i++) {
3228: postings[i] = new Posting();
3229: numBytesAlloc += POSTING_NUM_BYTE;
3230: postingsAllocCount++;
3231: }
3232: }
3233: }
3234:
3235: synchronized void recyclePostings(Posting[] postings,
3236: int numPostings) {
3237: // Move all Postings from this ThreadState back to our
3238: // free list. We pre-allocated this array while we were
3239: // creating Postings to make sure it's large enough
3240: assert postingsFreeCount + numPostings <= postingsFreeList.length;
3241: System.arraycopy(postings, 0, postingsFreeList,
3242: postingsFreeCount, numPostings);
3243: postingsFreeCount += numPostings;
3244: }
3245:
3246: /* Initial chunks size of the shared byte[] blocks used to
3247: store postings data */
3248: final static int BYTE_BLOCK_SHIFT = 15;
3249: final static int BYTE_BLOCK_SIZE = (int) Math.pow(2.0,
3250: BYTE_BLOCK_SHIFT);
3251: final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
3252: final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
3253:
3254: private ArrayList freeByteBlocks = new ArrayList();
3255:
3256: /* Allocate another byte[] from the shared pool */
3257: synchronized byte[] getByteBlock() {
3258: final int size = freeByteBlocks.size();
3259: final byte[] b;
3260: if (0 == size) {
3261: numBytesAlloc += BYTE_BLOCK_SIZE;
3262: balanceRAM();
3263: b = new byte[BYTE_BLOCK_SIZE];
3264: } else
3265: b = (byte[]) freeByteBlocks.remove(size - 1);
3266: numBytesUsed += BYTE_BLOCK_SIZE;
3267: return b;
3268: }
3269:
3270: /* Return a byte[] to the pool */
3271: synchronized void recycleByteBlocks(byte[][] blocks, int start,
3272: int end) {
3273: for (int i = start; i < end; i++)
3274: freeByteBlocks.add(blocks[i]);
3275: }
3276:
3277: /* Initial chunk size of the shared char[] blocks used to
3278: store term text */
3279: final static int CHAR_BLOCK_SHIFT = 14;
3280: final static int CHAR_BLOCK_SIZE = (int) Math.pow(2.0,
3281: CHAR_BLOCK_SHIFT);
3282: final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
3283:
3284: final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE - 1;
3285:
3286: private ArrayList freeCharBlocks = new ArrayList();
3287:
3288: /* Allocate another char[] from the shared pool */
3289: synchronized char[] getCharBlock() {
3290: final int size = freeCharBlocks.size();
3291: final char[] c;
3292: if (0 == size) {
3293: numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
3294: balanceRAM();
3295: c = new char[CHAR_BLOCK_SIZE];
3296: } else
3297: c = (char[]) freeCharBlocks.remove(size - 1);
3298: numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
3299: return c;
3300: }
3301:
3302: /* Return a char[] to the pool */
3303: synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
3304: for (int i = 0; i < numBlocks; i++)
3305: freeCharBlocks.add(blocks[i]);
3306: }
3307:
3308: String toMB(long v) {
3309: return nf.format(v / 1024. / 1024.);
3310: }
3311:
3312: /* We have three pools of RAM: Postings, byte blocks
3313: * (holds freq/prox posting data) and char blocks (holds
3314: * characters in the term). Different docs require
3315: * varying amount of storage from these three classes.
3316: * For example, docs with many unique single-occurrence
3317: * short terms will use up the Postings RAM and hardly any
3318: * of the other two. Whereas docs with very large terms
3319: * will use alot of char blocks RAM and relatively less of
3320: * the other two. This method just frees allocations from
3321: * the pools once we are over-budget, which balances the
3322: * pools to match the current docs. */
3323: private synchronized void balanceRAM() {
3324:
3325: if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH
3326: || bufferIsFull)
3327: return;
3328:
3329: // We free our allocations if we've allocated 5% over
3330: // our allowed RAM buffer
3331: final long freeTrigger = (long) (1.05 * ramBufferSize);
3332: final long freeLevel = (long) (0.95 * ramBufferSize);
3333:
3334: // We flush when we've used our target usage
3335: final long flushTrigger = (long) ramBufferSize;
3336:
3337: if (numBytesAlloc > freeTrigger) {
3338: if (infoStream != null)
3339: infoStream
3340: .println(" RAM: now balance allocations: usedMB="
3341: + toMB(numBytesUsed)
3342: + " vs trigger="
3343: + toMB(flushTrigger)
3344: + " allocMB="
3345: + toMB(numBytesAlloc)
3346: + " vs trigger="
3347: + toMB(freeTrigger)
3348: + " postingsFree="
3349: + toMB(postingsFreeCount
3350: * POSTING_NUM_BYTE)
3351: + " byteBlockFree="
3352: + toMB(freeByteBlocks.size()
3353: * BYTE_BLOCK_SIZE)
3354: + " charBlockFree="
3355: + toMB(freeCharBlocks.size()
3356: * CHAR_BLOCK_SIZE
3357: * CHAR_NUM_BYTE));
3358:
3359: // When we've crossed 100% of our target Postings
3360: // RAM usage, try to free up until we're back down
3361: // to 95%
3362: final long startBytesAlloc = numBytesAlloc;
3363:
3364: final int postingsFreeChunk = (int) (BYTE_BLOCK_SIZE / POSTING_NUM_BYTE);
3365:
3366: int iter = 0;
3367:
3368: // We free equally from each pool in 64 KB
3369: // chunks until we are below our threshold
3370: // (freeLevel)
3371:
3372: while (numBytesAlloc > freeLevel) {
3373: if (0 == freeByteBlocks.size()
3374: && 0 == freeCharBlocks.size()
3375: && 0 == postingsFreeCount) {
3376: // Nothing else to free -- must flush now.
3377: bufferIsFull = true;
3378: if (infoStream != null)
3379: infoStream
3380: .println(" nothing to free; now set bufferIsFull");
3381: break;
3382: }
3383:
3384: if ((0 == iter % 3) && freeByteBlocks.size() > 0) {
3385: freeByteBlocks.remove(freeByteBlocks.size() - 1);
3386: numBytesAlloc -= BYTE_BLOCK_SIZE;
3387: }
3388:
3389: if ((1 == iter % 3) && freeCharBlocks.size() > 0) {
3390: freeCharBlocks.remove(freeCharBlocks.size() - 1);
3391: numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
3392: }
3393:
3394: if ((2 == iter % 3) && postingsFreeCount > 0) {
3395: final int numToFree;
3396: if (postingsFreeCount >= postingsFreeChunk)
3397: numToFree = postingsFreeChunk;
3398: else
3399: numToFree = postingsFreeCount;
3400: Arrays.fill(postingsFreeList, postingsFreeCount
3401: - numToFree, postingsFreeCount, null);
3402: postingsFreeCount -= numToFree;
3403: postingsAllocCount -= numToFree;
3404: numBytesAlloc -= numToFree * POSTING_NUM_BYTE;
3405: }
3406:
3407: iter++;
3408: }
3409:
3410: if (infoStream != null)
3411: infoStream
3412: .println(" after free: freedMB="
3413: + nf
3414: .format((startBytesAlloc - numBytesAlloc) / 1024. / 1024.)
3415: + " usedMB="
3416: + nf
3417: .format(numBytesUsed / 1024. / 1024.)
3418: + " allocMB="
3419: + nf
3420: .format(numBytesAlloc / 1024. / 1024.));
3421:
3422: } else {
3423: // If we have not crossed the 100% mark, but have
3424: // crossed the 95% mark of RAM we are actually
3425: // using, go ahead and flush. This prevents
3426: // over-allocating and then freeing, with every
3427: // flush.
3428: if (numBytesUsed > flushTrigger) {
3429: if (infoStream != null)
3430: infoStream.println(" RAM: now flush @ usedMB="
3431: + nf.format(numBytesUsed / 1024. / 1024.)
3432: + " allocMB="
3433: + nf.format(numBytesAlloc / 1024. / 1024.)
3434: + " triggerMB="
3435: + nf.format(flushTrigger / 1024. / 1024.));
3436:
3437: bufferIsFull = true;
3438: }
3439: }
3440: }
3441:
3442: /* Used to track postings for a single term. One of these
3443: * exists per unique term seen since the last flush. */
3444: private final static class Posting {
3445: int textStart; // Address into char[] blocks where our text is stored
3446: int docFreq; // # times this term occurs in the current doc
3447: int freqStart; // Address of first byte[] slice for freq
3448: int freqUpto; // Next write address for freq
3449: int proxStart; // Address of first byte[] slice
3450: int proxUpto; // Next write address for prox
3451: int lastDocID; // Last docID where this term occurred
3452: int lastDocCode; // Code for prior doc
3453: int lastPosition; // Last position where this term occurred
3454: PostingVector vector; // Corresponding PostingVector instance
3455: }
3456:
3457: /* Used to track data for term vectors. One of these
3458: * exists per unique term seen in each field in the
3459: * document. */
3460: private final static class PostingVector {
3461: Posting p; // Corresponding Posting instance for this term
3462: int lastOffset; // Last offset we saw
3463: int offsetStart; // Address of first slice for offsets
3464: int offsetUpto; // Next write address for offsets
3465: int posStart; // Address of first slice for positions
3466: int posUpto; // Next write address for positions
3467: }
3468: }
3469:
3470: // Used only internally to DW to call abort "up the stack"
3471: class AbortException extends IOException {
3472: public AbortException(Throwable cause, DocumentsWriter docWriter) {
3473: super();
3474: initCause(cause);
3475: docWriter.setAborting();
3476: }
3477: }
|