001: package org.apache.lucene.index;
002:
003: /**
004: * Licensed to the Apache Software Foundation (ASF) under one or more
005: * contributor license agreements. See the NOTICE file distributed with
006: * this work for additional information regarding copyright ownership.
007: * The ASF licenses this file to You under the Apache License, Version 2.0
008: * (the "License"); you may not use this file except in compliance with
009: * the License. You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: */
019:
020: import org.apache.lucene.store.Directory;
021:
022: import java.io.IOException;
023: import java.io.FileNotFoundException;
024: import java.io.PrintStream;
025: import java.util.Map;
026: import java.util.HashMap;
027: import java.util.Iterator;
028: import java.util.List;
029: import java.util.ArrayList;
030: import java.util.Collections;
031: import java.util.Collection;
032:
033: /*
034: * This class keeps track of each SegmentInfos instance that
035: * is still "live", either because it corresponds to a
036: * segments_N file in the Directory (a "commit", i.e. a
037: * committed SegmentInfos) or because it's the in-memory SegmentInfos
038: * that a writer is actively updating but has not yet committed
039: * (currently this only applies when autoCommit=false in IndexWriter).
040: * This class uses simple reference counting to map the live
041: * SegmentInfos instances to individual files in the Directory.
042: *
043: * The same directory file may be referenced by more than
044: * one IndexCommitPoints, i.e. more than one SegmentInfos.
045: * Therefore we count how many commits reference each file.
046: * When all the commits referencing a certain file have been
047: * deleted, the refcount for that file becomes zero, and the
048: * file is deleted.
049: *
050: * A separate deletion policy interface
051: * (IndexDeletionPolicy) is consulted on creation (onInit)
052: * and once per commit (onCommit), to decide when a commit
053: * should be removed.
054: *
055: * It is the business of the IndexDeletionPolicy to choose
056: * when to delete commit points. The actual mechanics of
057: * file deletion, retrying, etc, derived from the deletion
058: * of commit points is the business of the IndexFileDeleter.
059: *
060: * The current default deletion policy is {@link
061: * KeepOnlyLastCommitDeletionPolicy}, which removes all
062: * prior commits when a new commit has completed. This
063: * matches the behavior before 2.2.
064: *
065: * Note that you must hold the write.lock before
066: * instantiating this class. It opens segments_N file(s)
067: * directly with no retry logic.
068: */
069:
070: final class IndexFileDeleter {
071:
072: /* Files that we tried to delete but failed (likely
073: * because they are open and we are running on Windows),
074: * so we will retry them again later: */
075: private List deletable;
076:
077: /* Reference count for all files in the index.
078: * Counts how many existing commits reference a file.
079: * Maps String to RefCount (class below) instances: */
080: private Map refCounts = new HashMap();
081:
082: /* Holds all commits (segments_N) currently in the index.
083: * This will have just 1 commit if you are using the
084: * default delete policy (KeepOnlyLastCommitDeletionPolicy).
085: * Other policies may leave commit points live for longer
086: * in which case this list would be longer than 1: */
087: private List commits = new ArrayList();
088:
089: /* Holds files we had incref'd from the previous
090: * non-commit checkpoint: */
091: private List lastFiles = new ArrayList();
092:
093: /* Commits that the IndexDeletionPolicy have decided to delete: */
094: private List commitsToDelete = new ArrayList();
095:
096: private PrintStream infoStream;
097: private Directory directory;
098: private IndexDeletionPolicy policy;
099: private DocumentsWriter docWriter;
100:
101: /** Change to true to see details of reference counts when
102: * infoStream != null */
103: public static boolean VERBOSE_REF_COUNTS = false;
104:
105: void setInfoStream(PrintStream infoStream) {
106: this .infoStream = infoStream;
107: if (infoStream != null)
108: message("setInfoStream deletionPolicy=" + policy);
109: }
110:
111: private void message(String message) {
112: infoStream.println("IFD [" + Thread.currentThread().getName()
113: + "]: " + message);
114: }
115:
116: /**
117: * Initialize the deleter: find all previous commits in
118: * the Directory, incref the files they reference, call
119: * the policy to let it delete commits. The incoming
120: * segmentInfos must have been loaded from a commit point
121: * and not yet modified. This will remove any files not
122: * referenced by any of the commits.
123: * @throws CorruptIndexException if the index is corrupt
124: * @throws IOException if there is a low-level IO error
125: */
126: public IndexFileDeleter(Directory directory,
127: IndexDeletionPolicy policy, SegmentInfos segmentInfos,
128: PrintStream infoStream, DocumentsWriter docWriter)
129: throws CorruptIndexException, IOException {
130:
131: this .docWriter = docWriter;
132: this .infoStream = infoStream;
133:
134: if (infoStream != null)
135: message("init: current segments file is \""
136: + segmentInfos.getCurrentSegmentFileName()
137: + "\"; deletionPolicy=" + policy);
138:
139: this .policy = policy;
140: this .directory = directory;
141:
142: // First pass: walk the files and initialize our ref
143: // counts:
144: long currentGen = segmentInfos.getGeneration();
145: IndexFileNameFilter filter = IndexFileNameFilter.getFilter();
146:
147: String[] files = directory.list();
148: if (files == null)
149: throw new IOException("cannot read directory " + directory
150: + ": list() returned null");
151:
152: CommitPoint currentCommitPoint = null;
153:
154: for (int i = 0; i < files.length; i++) {
155:
156: String fileName = files[i];
157:
158: if (filter.accept(null, fileName)
159: && !fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
160:
161: // Add this file to refCounts with initial count 0:
162: getRefCount(fileName);
163:
164: if (fileName.startsWith(IndexFileNames.SEGMENTS)) {
165:
166: // This is a commit (segments or segments_N), and
167: // it's valid (<= the max gen). Load it, then
168: // incref all files it refers to:
169: if (SegmentInfos
170: .generationFromSegmentsFileName(fileName) <= currentGen) {
171: if (infoStream != null) {
172: message("init: load commit \"" + fileName
173: + "\"");
174: }
175: SegmentInfos sis = new SegmentInfos();
176: try {
177: sis.read(directory, fileName);
178: } catch (FileNotFoundException e) {
179: // LUCENE-948: on NFS (and maybe others), if
180: // you have writers switching back and forth
181: // between machines, it's very likely that the
182: // dir listing will be stale and will claim a
183: // file segments_X exists when in fact it
184: // doesn't. So, we catch this and handle it
185: // as if the file does not exist
186: if (infoStream != null) {
187: message("init: hit FileNotFoundException when loading commit \""
188: + fileName
189: + "\"; skipping this commit point");
190: }
191: sis = null;
192: }
193: if (sis != null) {
194: CommitPoint commitPoint = new CommitPoint(
195: sis);
196: if (sis.getGeneration() == segmentInfos
197: .getGeneration()) {
198: currentCommitPoint = commitPoint;
199: }
200: commits.add(commitPoint);
201: incRef(sis, true);
202: }
203: }
204: }
205: }
206: }
207:
208: if (currentCommitPoint == null) {
209: // We did not in fact see the segments_N file
210: // corresponding to the segmentInfos that was passed
211: // in. Yet, it must exist, because our caller holds
212: // the write lock. This can happen when the directory
213: // listing was stale (eg when index accessed via NFS
214: // client with stale directory listing cache). So we
215: // try now to explicitly open this commit point:
216: SegmentInfos sis = new SegmentInfos();
217: try {
218: sis.read(directory, segmentInfos
219: .getCurrentSegmentFileName());
220: } catch (IOException e) {
221: throw new CorruptIndexException(
222: "failed to locate current segments_N file");
223: }
224: if (infoStream != null)
225: message("forced open of current segments file "
226: + segmentInfos.getCurrentSegmentFileName());
227: currentCommitPoint = new CommitPoint(sis);
228: commits.add(currentCommitPoint);
229: incRef(sis, true);
230: }
231:
232: // We keep commits list in sorted order (oldest to newest):
233: Collections.sort(commits);
234:
235: // Now delete anything with ref count at 0. These are
236: // presumably abandoned files eg due to crash of
237: // IndexWriter.
238: Iterator it = refCounts.keySet().iterator();
239: while (it.hasNext()) {
240: String fileName = (String) it.next();
241: RefCount rc = (RefCount) refCounts.get(fileName);
242: if (0 == rc.count) {
243: if (infoStream != null) {
244: message("init: removing unreferenced file \""
245: + fileName + "\"");
246: }
247: deleteFile(fileName);
248: }
249: }
250:
251: // Finally, give policy a chance to remove things on
252: // startup:
253: policy.onInit(commits);
254:
255: // It's OK for the onInit to remove the current commit
256: // point; we just have to checkpoint our in-memory
257: // SegmentInfos to protect those files that it uses:
258: if (currentCommitPoint.deleted) {
259: checkpoint(segmentInfos, false);
260: }
261:
262: deleteCommits();
263: }
264:
265: /**
266: * Remove the CommitPoints in the commitsToDelete List by
267: * DecRef'ing all files from each SegmentInfos.
268: */
269: private void deleteCommits() throws IOException {
270:
271: int size = commitsToDelete.size();
272:
273: if (size > 0) {
274:
275: // First decref all files that had been referred to by
276: // the now-deleted commits:
277: for (int i = 0; i < size; i++) {
278: CommitPoint commit = (CommitPoint) commitsToDelete
279: .get(i);
280: if (infoStream != null) {
281: message("deleteCommits: now remove commit \""
282: + commit.getSegmentsFileName() + "\"");
283: }
284: int size2 = commit.files.size();
285: for (int j = 0; j < size2; j++) {
286: decRef((String) commit.files.get(j));
287: }
288: }
289: commitsToDelete.clear();
290:
291: // Now compact commits to remove deleted ones (preserving the sort):
292: size = commits.size();
293: int readFrom = 0;
294: int writeTo = 0;
295: while (readFrom < size) {
296: CommitPoint commit = (CommitPoint) commits
297: .get(readFrom);
298: if (!commit.deleted) {
299: if (writeTo != readFrom) {
300: commits.set(writeTo, commits.get(readFrom));
301: }
302: writeTo++;
303: }
304: readFrom++;
305: }
306:
307: while (size > writeTo) {
308: commits.remove(size - 1);
309: size--;
310: }
311: }
312: }
313:
314: /**
315: * Writer calls this when it has hit an error and had to
316: * roll back, to tell us that there may now be
317: * unreferenced files in the filesystem. So we re-list
318: * the filesystem and delete such files. If segmentName
319: * is non-null, we will only delete files corresponding to
320: * that segment.
321: */
322: public void refresh(String segmentName) throws IOException {
323: String[] files = directory.list();
324: if (files == null)
325: throw new IOException("cannot read directory " + directory
326: + ": list() returned null");
327: IndexFileNameFilter filter = IndexFileNameFilter.getFilter();
328: String segmentPrefix1;
329: String segmentPrefix2;
330: if (segmentName != null) {
331: segmentPrefix1 = segmentName + ".";
332: segmentPrefix2 = segmentName + "_";
333: } else {
334: segmentPrefix1 = null;
335: segmentPrefix2 = null;
336: }
337:
338: for (int i = 0; i < files.length; i++) {
339: String fileName = files[i];
340: if (filter.accept(null, fileName)
341: && (segmentName == null
342: || fileName.startsWith(segmentPrefix1) || fileName
343: .startsWith(segmentPrefix2))
344: && !refCounts.containsKey(fileName)
345: && !fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
346: // Unreferenced file, so remove it
347: if (infoStream != null) {
348: message("refresh [prefix="
349: + segmentName
350: + "]: removing newly created unreferenced file \""
351: + fileName + "\"");
352: }
353: deleteFile(fileName);
354: }
355: }
356: }
357:
358: public void refresh() throws IOException {
359: refresh(null);
360: }
361:
362: public void close() throws IOException {
363: deletePendingFiles();
364: }
365:
366: private void deletePendingFiles() throws IOException {
367: if (deletable != null) {
368: List oldDeletable = deletable;
369: deletable = null;
370: int size = oldDeletable.size();
371: for (int i = 0; i < size; i++) {
372: if (infoStream != null)
373: message("delete pending file "
374: + oldDeletable.get(i));
375: deleteFile((String) oldDeletable.get(i));
376: }
377: }
378: }
379:
380: /**
381: * For definition of "check point" see IndexWriter comments:
382: * "Clarification: Check Points (and commits)".
383: *
384: * Writer calls this when it has made a "consistent
385: * change" to the index, meaning new files are written to
386: * the index and the in-memory SegmentInfos have been
387: * modified to point to those files.
388: *
389: * This may or may not be a commit (segments_N may or may
390: * not have been written).
391: *
392: * We simply incref the files referenced by the new
393: * SegmentInfos and decref the files we had previously
394: * seen (if any).
395: *
396: * If this is a commit, we also call the policy to give it
397: * a chance to remove other commits. If any commits are
398: * removed, we decref their files as well.
399: */
400: public void checkpoint(SegmentInfos segmentInfos, boolean isCommit)
401: throws IOException {
402:
403: if (infoStream != null) {
404: message("now checkpoint \""
405: + segmentInfos.getCurrentSegmentFileName() + "\" ["
406: + segmentInfos.size() + " segments "
407: + "; isCommit = " + isCommit + "]");
408: }
409:
410: // Try again now to delete any previously un-deletable
411: // files (because they were in use, on Windows):
412: deletePendingFiles();
413:
414: // Incref the files:
415: incRef(segmentInfos, isCommit);
416: final List docWriterFiles;
417: if (docWriter != null) {
418: docWriterFiles = docWriter.files();
419: if (docWriterFiles != null)
420: incRef(docWriterFiles);
421: } else
422: docWriterFiles = null;
423:
424: if (isCommit) {
425: // Append to our commits list:
426: commits.add(new CommitPoint(segmentInfos));
427:
428: // Tell policy so it can remove commits:
429: policy.onCommit(commits);
430:
431: // Decref files for commits that were deleted by the policy:
432: deleteCommits();
433: }
434:
435: // DecRef old files from the last checkpoint, if any:
436: int size = lastFiles.size();
437: if (size > 0) {
438: for (int i = 0; i < size; i++)
439: decRef((List) lastFiles.get(i));
440: lastFiles.clear();
441: }
442:
443: if (!isCommit) {
444: // Save files so we can decr on next checkpoint/commit:
445: size = segmentInfos.size();
446: for (int i = 0; i < size; i++) {
447: SegmentInfo segmentInfo = segmentInfos.info(i);
448: if (segmentInfo.dir == directory) {
449: lastFiles.add(segmentInfo.files());
450: }
451: }
452: }
453: if (docWriterFiles != null)
454: lastFiles.add(docWriterFiles);
455: }
456:
457: void incRef(SegmentInfos segmentInfos, boolean isCommit)
458: throws IOException {
459: int size = segmentInfos.size();
460: for (int i = 0; i < size; i++) {
461: SegmentInfo segmentInfo = segmentInfos.info(i);
462: if (segmentInfo.dir == directory) {
463: incRef(segmentInfo.files());
464: }
465: }
466:
467: if (isCommit) {
468: // Since this is a commit point, also incref its
469: // segments_N file:
470: getRefCount(segmentInfos.getCurrentSegmentFileName())
471: .IncRef();
472: }
473: }
474:
475: void incRef(List files) throws IOException {
476: int size = files.size();
477: for (int i = 0; i < size; i++) {
478: String fileName = (String) files.get(i);
479: RefCount rc = getRefCount(fileName);
480: if (infoStream != null && VERBOSE_REF_COUNTS) {
481: message(" IncRef \"" + fileName
482: + "\": pre-incr count is " + rc.count);
483: }
484: rc.IncRef();
485: }
486: }
487:
488: void decRef(List files) throws IOException {
489: int size = files.size();
490: for (int i = 0; i < size; i++) {
491: decRef((String) files.get(i));
492: }
493: }
494:
495: private void decRef(String fileName) throws IOException {
496: RefCount rc = getRefCount(fileName);
497: if (infoStream != null && VERBOSE_REF_COUNTS) {
498: message(" DecRef \"" + fileName + "\": pre-decr count is "
499: + rc.count);
500: }
501: if (0 == rc.DecRef()) {
502: // This file is no longer referenced by any past
503: // commit points nor by the in-memory SegmentInfos:
504: deleteFile(fileName);
505: refCounts.remove(fileName);
506: }
507: }
508:
509: void decRef(SegmentInfos segmentInfos) throws IOException {
510: final int size = segmentInfos.size();
511: for (int i = 0; i < size; i++) {
512: SegmentInfo segmentInfo = segmentInfos.info(i);
513: if (segmentInfo.dir == directory) {
514: decRef(segmentInfo.files());
515: }
516: }
517: }
518:
519: private RefCount getRefCount(String fileName) {
520: RefCount rc;
521: if (!refCounts.containsKey(fileName)) {
522: rc = new RefCount();
523: refCounts.put(fileName, rc);
524: } else {
525: rc = (RefCount) refCounts.get(fileName);
526: }
527: return rc;
528: }
529:
530: void deleteFiles(List files) throws IOException {
531: final int size = files.size();
532: for (int i = 0; i < size; i++)
533: deleteFile((String) files.get(i));
534: }
535:
536: /** Delets the specified files, but only if they are new
537: * (have not yet been incref'd). */
538: void deleteNewFiles(List files) throws IOException {
539: final int size = files.size();
540: for (int i = 0; i < size; i++)
541: if (!refCounts.containsKey(files.get(i)))
542: deleteFile((String) files.get(i));
543: }
544:
545: void deleteFile(String fileName) throws IOException {
546: try {
547: if (infoStream != null) {
548: message("delete \"" + fileName + "\"");
549: }
550: directory.deleteFile(fileName);
551: } catch (IOException e) { // if delete fails
552: if (directory.fileExists(fileName)) {
553:
554: // Some operating systems (e.g. Windows) don't
555: // permit a file to be deleted while it is opened
556: // for read (e.g. by another process or thread). So
557: // we assume that when a delete fails it is because
558: // the file is open in another process, and queue
559: // the file for subsequent deletion.
560:
561: if (infoStream != null) {
562: message("IndexFileDeleter: unable to remove file \""
563: + fileName
564: + "\": "
565: + e.toString()
566: + "; Will re-try later.");
567: }
568: if (deletable == null) {
569: deletable = new ArrayList();
570: }
571: deletable.add(fileName); // add to deletable
572: }
573: }
574: }
575:
576: /**
577: * Tracks the reference count for a single index file:
578: */
579: final private static class RefCount {
580:
581: int count;
582:
583: public int IncRef() {
584: return ++count;
585: }
586:
587: public int DecRef() {
588: assert count > 0;
589: return --count;
590: }
591: }
592:
593: /**
594: * Holds details for each commit point. This class is
595: * also passed to the deletion policy. Note: this class
596: * has a natural ordering that is inconsistent with
597: * equals.
598: */
599:
600: final private class CommitPoint implements Comparable,
601: IndexCommitPoint {
602:
603: long gen;
604: List files;
605: String segmentsFileName;
606: boolean deleted;
607:
608: public CommitPoint(SegmentInfos segmentInfos)
609: throws IOException {
610: segmentsFileName = segmentInfos.getCurrentSegmentFileName();
611: int size = segmentInfos.size();
612: files = new ArrayList(size);
613: files.add(segmentsFileName);
614: gen = segmentInfos.getGeneration();
615: for (int i = 0; i < size; i++) {
616: SegmentInfo segmentInfo = segmentInfos.info(i);
617: if (segmentInfo.dir == directory) {
618: files.addAll(segmentInfo.files());
619: }
620: }
621: }
622:
623: /**
624: * Get the segments_N file for this commit point.
625: */
626: public String getSegmentsFileName() {
627: return segmentsFileName;
628: }
629:
630: public Collection getFileNames() throws IOException {
631: return Collections.unmodifiableCollection(files);
632: }
633:
634: /**
635: * Called only be the deletion policy, to remove this
636: * commit point from the index.
637: */
638: public void delete() {
639: if (!deleted) {
640: deleted = true;
641: commitsToDelete.add(this );
642: }
643: }
644:
645: public int compareTo(Object obj) {
646: CommitPoint commit = (CommitPoint) obj;
647: if (gen < commit.gen) {
648: return -1;
649: } else if (gen > commit.gen) {
650: return 1;
651: } else {
652: return 0;
653: }
654: }
655: }
656: }
|