001: /*-
002: * See the file LICENSE for redistribution information.
003: *
004: * Copyright (c) 2002,2008 Oracle. All rights reserved.
005: *
006: * $Id: FileSelector.java,v 1.15.2.4 2008/01/07 15:14:08 cwl Exp $
007: */
008:
009: package com.sleepycat.je.cleaner;
010:
011: import java.util.Collections;
012: import java.util.HashMap;
013: import java.util.HashSet;
014: import java.util.LinkedList;
015: import java.util.List;
016: import java.util.Map;
017: import java.util.Set;
018: import java.util.SortedSet;
019: import java.util.TreeSet;
020:
021: import com.sleepycat.je.DatabaseException;
022: import com.sleepycat.je.dbi.DatabaseId;
023: import com.sleepycat.je.tree.LN;
024:
025: /**
026: * Keeps track of the status of files for which cleaning is in progres.
027: */
028: public class FileSelector {
029:
030: /*
031: * Each file for which cleaning is in progress is in one of the following
032: * collections. Files numbers migrate from one collection to another as
033: * their status changes, in order:
034: *
035: * toBeCleaned -> beingCleaned -> cleaned ->
036: * checkpointed -> fullyProcessed -> safeToDelete
037: *
038: * Access to these collections is synchronized to guarantee that the status
039: * is atomically updated.
040: */
041:
042: /*
043: * A file is initially to-be-cleaned when it is selected as part of a batch
044: * of files that, when deleted, will bring total utilization down to the
045: * minimum configured value. All files in this collection will be cleaned
046: * in lowest-cost-to-clean order. For two files of equal cost to clean,
047: * the lower numbered (oldest) files is selected; this is why the set is
048: * sorted.
049: */
050: private SortedSet toBeCleanedFiles;
051:
052: /*
053: * When a file is selected for processing by FileProcessor from the
054: * to-be-cleaned list, it is added to this processing set. This
055: * distinction is used to prevent a file from being processed by more than
056: * one thread.
057: */
058: private Set beingCleanedFiles;
059:
060: /*
061: * A file moves to the cleaned set when all log entries have been read and
062: * processed. However, entries needing migration will be marked with the
063: * BIN entry MIGRATE flag, entries that could not be locked will be in the
064: * pending LN set, and the DBs that were pending deletion will be in the
065: * pending DB set.
066: *
067: * After processing a file, we may identify a set of deferred write dbs
068: * which need a sync before we can safely delete the file. The
069: * cleanedDeferredWriteDbs is the collecton of DatabaseIds for those dw
070: * dbs, and goes lockstep with the cleanedFiles. It's implemented as a
071: * list rather than a set, because a DW db may show up multiple times
072: * for different files, unlike the files in the cleanedFiles set.
073: * For example, suppose
074: * - file 20 is cleaned and adds DW dbs 1,3
075: * - file 25 is cleaned and adds DW dbs 1,5
076: * - a checkpoint starts, and copies dw db 1,3,5
077: * - file 30 is cleaned, and adds DW dbs 1,5
078: * When the ckpt returns and removes the proper entries from the list,
079: * it must make sure that dbs 1,5 stay on the list, because that sync
080: * must happen for file 30.
081: */
082: private Set cleanedFiles;
083: private LinkedList cleanedDeferredWriteDbs;
084:
085: /*
086: * A file moves to the checkpointed set at the end of a checkpoint if it
087: * was in the cleaned set at the beginning of the checkpoint. Because all
088: * dirty BINs are flushed during the checkpoints, no files in this set
089: * will have entries with the MIGRATE flag set. However, some entries may
090: * be in the pending LN set and some DBs may be in the pending DB set.
091: */
092: private Set checkpointedFiles;
093:
094: /*
095: * A file is moved from the checkpointed set to the fully-processed set
096: * when the pending LN/DB sets become empty. Since a pending LN was not
097: * locked successfully, we don't know its original file. But we do know
098: * that when no pending LNs are present for any file, all log entries in
099: * checkpointed files are either obsolete or have been migrated. Note,
100: * however, that the parent BINs of the migrated entries may not have been
101: * logged yet.
102: *
103: * No special handling is required to coordinate syncing of deferred write
104: * databases for pending, deferred write LNs. Note that although DW
105: * databases are non-txnal, their LNs may be pended because of lock
106: * collisions. Any required syncing falls out naturally because dbs are
107: * entered into the cleanedDeferredWriteDbs list when a member LN is
108: * successfully processed and removed from the pendingLN set, whether that
109: * happens on the first pass of processing the file, or on future passes
110: * when the LN is removed from the pending set.
111: */
112: private Set fullyProcessedFiles;
113:
114: /*
115: * A file moves to the safe-to-delete set at the end of a checkpoint if it
116: * was in the fully-processed set at the beginning of the checkpoint. All
117: * parent BINs of migrated entries have now been logged, and any
118: * deferred write db syncs have been executed and the files are
119: * safe to delete.
120: */
121: private Set safeToDeleteFiles;
122:
123: /*
124: * Pending LNs are stored in a map of {NodeID -> LNInfo}. These are LNs
125: * that could not be locked, either during processing or during migration.
126: */
127: private Map pendingLNs;
128:
129: /*
130: * For processed entries with DBs that are pending deletion, we consider
131: * them to be obsolete but we store their DatabaseIds in a set. Until the
132: * DB deletion is complete, we can't delete the log files containing those
133: * entries.
134: */
135: private Set pendingDBs;
136:
137: /*
138: * If during a checkpoint there are no pending LNs or DBs added, we can
139: * move cleaned files to safe-delete files at the end of the checkpoint.
140: * This is an optimization that allows deleting files more quickly when
141: * possible. In particular this impacts the checkpoint during environment
142: * close, since no user operations are active during that checkpoint; this
143: * optimization allows us to delete all cleaned files after the final
144: * checkpoint.
145: */
146: private boolean anyPendingDuringCheckpoint;
147:
148: /*
149: * As a side effect of file selection a set of low utilization files is
150: * determined. This set is guaranteed to be non-null and read-only, so no
151: * synchronization is needed when accessing it.
152: */
153: private Set lowUtilizationFiles;
154:
155: FileSelector() {
156: toBeCleanedFiles = new TreeSet();
157: cleanedFiles = new HashSet();
158: cleanedDeferredWriteDbs = new LinkedList();
159: checkpointedFiles = new HashSet();
160: fullyProcessedFiles = new HashSet();
161: safeToDeleteFiles = new HashSet();
162: pendingLNs = new HashMap();
163: pendingDBs = new HashSet();
164: lowUtilizationFiles = Collections.EMPTY_SET;
165: beingCleanedFiles = new HashSet();
166: }
167:
168: /**
169: * Returns the best file that qualifies for cleaning, or null if no file
170: * qualifies. This method is not thread safe and should only be called
171: * from the cleaner thread.
172: *
173: * @param forceCleaning is true to always select a file, even if its
174: * utilization is above the minimum utilization threshold.
175: *
176: * @param calcLowUtilizationFiles whether to recalculate the set of files
177: * that are below the minimum utilization threshold.
178: *
179: * @param maxBatchFiles is the maximum number of files to be selected at
180: * one time, or zero if there is no limit.
181: *
182: * @return the next file to be cleaned, or null if no file needs cleaning.
183: */
184: Long selectFileForCleaning(UtilizationProfile profile,
185: boolean forceCleaning, boolean calcLowUtilizationFiles,
186: int maxBatchFiles) throws DatabaseException {
187:
188: Set newLowUtilizationFiles = calcLowUtilizationFiles ? (new HashSet())
189: : null;
190:
191: /*
192: * Add files until we reach the theoretical minimum utilization
193: * threshold.
194: */
195: while (true) {
196:
197: if (maxBatchFiles > 0) {
198: synchronized (this ) {
199: if (toBeCleanedFiles.size() >= maxBatchFiles) {
200: break;
201: }
202: }
203: }
204:
205: Long fileNum = profile.getBestFileForCleaning(this ,
206: forceCleaning, newLowUtilizationFiles);
207:
208: if (fileNum == null) {
209: break;
210: }
211:
212: synchronized (this ) {
213: toBeCleanedFiles.add(fileNum);
214: }
215: }
216:
217: /* Update the read-only set. */
218: if (newLowUtilizationFiles != null) {
219: lowUtilizationFiles = newLowUtilizationFiles;
220: }
221:
222: /*
223: * Select the cheapest file to clean from a copy of the to-be-cleaned
224: * set. Then move the file from the to-be-cleaned set to the
225: * being-cleaned set.
226: */
227: SortedSet availableFiles;
228: synchronized (this ) {
229: availableFiles = new TreeSet(toBeCleanedFiles);
230: }
231: Long file = profile.getCheapestFileToClean(availableFiles);
232: if (file != null) {
233: synchronized (this ) {
234: toBeCleanedFiles.remove(file);
235: beingCleanedFiles.add(file);
236: }
237: }
238: return file;
239: }
240:
241: /**
242: * Returns whether the file is in any stage of the cleaning process.
243: */
244: synchronized boolean isFileCleaningInProgress(Long file) {
245: return toBeCleanedFiles.contains(file)
246: || beingCleanedFiles.contains(file)
247: || cleanedFiles.contains(file)
248: || checkpointedFiles.contains(file)
249: || fullyProcessedFiles.contains(file)
250: || safeToDeleteFiles.contains(file);
251: }
252:
253: /**
254: * Removes all references to a file.
255: */
256: synchronized void removeAllFileReferences(Long file) {
257: toBeCleanedFiles.remove(file);
258: beingCleanedFiles.remove(file);
259: cleanedFiles.remove(file);
260: checkpointedFiles.remove(file);
261: fullyProcessedFiles.remove(file);
262: safeToDeleteFiles.remove(file);
263: }
264:
265: /**
266: * When file cleaning is aborted, move the file back from the being-cleaned
267: * set to the to-be-cleaned set.
268: */
269: synchronized void putBackFileForCleaning(Long fileNum) {
270: toBeCleanedFiles.add(fileNum);
271: beingCleanedFiles.remove(fileNum);
272: }
273:
274: /**
275: * When cleaning is complete, move the file from the being-cleaned set to
276: * the cleaned set.
277: */
278: synchronized void addCleanedFile(Long fileNum, Set deferredWriteDbs) {
279: cleanedFiles.add(fileNum);
280: cleanedDeferredWriteDbs.addAll(deferredWriteDbs);
281: beingCleanedFiles.remove(fileNum);
282: }
283:
284: /**
285: * Returns a read-only set of low utilization files that can be accessed
286: * without synchronization.
287: */
288: Set getLowUtilizationFiles() {
289: /* This set is read-only, so there is no need to make a copy. */
290: return lowUtilizationFiles;
291: }
292:
293: /**
294: * Returns a read-only copy of to-be-cleaned and being-cleaned files that
295: * can be accessed without synchronization.
296: */
297: synchronized Set getMustBeCleanedFiles() {
298: Set set = new HashSet(toBeCleanedFiles);
299: set.addAll(beingCleanedFiles);
300: return set;
301: }
302:
303: /**
304: * Returns the number of files waiting to-be-cleaned.
305: */
306: synchronized int getBacklog() {
307: return toBeCleanedFiles.size();
308: }
309:
310: /**
311: * Returns a copy of the cleaned and fully-processed files at the time a
312: * checkpoint starts.
313: */
314: synchronized CheckpointStartCleanerState getFilesAtCheckpointStart() {
315:
316: anyPendingDuringCheckpoint = !pendingLNs.isEmpty()
317: || !pendingDBs.isEmpty();
318:
319: CheckpointStartCleanerState info = new CheckpointStartCleanerState(
320: cleanedFiles, fullyProcessedFiles,
321: cleanedDeferredWriteDbs);
322: return info;
323: }
324:
325: /**
326: * When a checkpoint is complete, move the previously cleaned and
327: * fully-processed files to the checkpointed and safe-to-delete sets.
328: * Also take the dbs that have been synced through this checkpoint off
329: * their place at the top of the deferredWriteDb list
330: */
331: synchronized void updateFilesAtCheckpointEnd(
332: CheckpointStartCleanerState info) {
333:
334: if (!info.isEmpty()) {
335:
336: Set previouslyCleanedFiles = info.getCleanedFiles();
337: if (previouslyCleanedFiles != null) {
338: if (anyPendingDuringCheckpoint) {
339: checkpointedFiles.addAll(previouslyCleanedFiles);
340: } else {
341: safeToDeleteFiles.addAll(previouslyCleanedFiles);
342: }
343: cleanedFiles.removeAll(previouslyCleanedFiles);
344: }
345:
346: Set previouslyProcessedFiles = info
347: .getFullyProcessedFiles();
348: if (previouslyProcessedFiles != null) {
349: safeToDeleteFiles.addAll(previouslyProcessedFiles);
350: fullyProcessedFiles.removeAll(previouslyProcessedFiles);
351: }
352:
353: int previousSize = cleanedDeferredWriteDbs.size();
354: int numDbsSyncedByCheckpoint = info
355: .getDeferredWriteDbsSize();
356: for (int i = 0; i < numDbsSyncedByCheckpoint; i++) {
357: cleanedDeferredWriteDbs.removeFirst();
358: }
359:
360: assert cleanedDeferredWriteDbs.size() == previousSize
361: - numDbsSyncedByCheckpoint;
362:
363: updateProcessedFiles();
364: }
365: }
366:
367: /**
368: * Adds the given LN info to the pending LN set.
369: */
370: synchronized boolean addPendingLN(LN ln, DatabaseId dbId,
371: byte[] key, byte[] dupKey) {
372: assert ln != null;
373:
374: boolean added = pendingLNs.put(new Long(ln.getNodeId()),
375: new LNInfo(ln, dbId, key, dupKey)) != null;
376:
377: anyPendingDuringCheckpoint = true;
378: return added;
379: }
380:
381: /**
382: * Returns an array of LNInfo for LNs that could not be migrated in a
383: * prior cleaning attempt, or null if no LNs are pending.
384: */
385: synchronized LNInfo[] getPendingLNs() {
386:
387: if (pendingLNs.size() > 0) {
388: LNInfo[] lns = new LNInfo[pendingLNs.size()];
389: pendingLNs.values().toArray(lns);
390: return lns;
391: } else {
392: return null;
393: }
394: }
395:
396: /**
397: * Removes the LN for the given node ID from the pending LN set.
398: */
399: synchronized void removePendingLN(long nodeId) {
400:
401: pendingLNs.remove(new Long(nodeId));
402: updateProcessedFiles();
403: }
404:
405: /**
406: * Adds the given DatabaseId to the pending DB set.
407: */
408: synchronized boolean addPendingDB(DatabaseId dbId) {
409:
410: boolean added = pendingDBs.add(dbId);
411:
412: anyPendingDuringCheckpoint = true;
413: return added;
414: }
415:
416: /**
417: * Returns an array of DatabaseIds for DBs that were pending deletion in a
418: * prior cleaning attempt, or null if no DBs are pending.
419: */
420: synchronized DatabaseId[] getPendingDBs() {
421:
422: if (pendingDBs.size() > 0) {
423: DatabaseId[] dbs = new DatabaseId[pendingDBs.size()];
424: pendingDBs.toArray(dbs);
425: return dbs;
426: } else {
427: return null;
428: }
429: }
430:
431: /**
432: * Removes the DatabaseId from the pending DB set.
433: */
434: synchronized void removePendingDB(DatabaseId dbId) {
435:
436: pendingDBs.remove(dbId);
437: updateProcessedFiles();
438: }
439:
440: /**
441: * Returns a copy of the safe-to-delete files.
442: */
443: synchronized Set copySafeToDeleteFiles() {
444: if (safeToDeleteFiles.size() == 0) {
445: return null;
446: } else {
447: return new HashSet(safeToDeleteFiles);
448: }
449: }
450:
451: /**
452: * Removes file from the safe-to-delete set after the file itself has
453: * finally been deleted.
454: */
455: synchronized void removeDeletedFile(Long fileNum) {
456: safeToDeleteFiles.remove(fileNum);
457: }
458:
459: /**
460: * If there are no pending LNs or DBs outstanding, move the checkpointed
461: * files to the fully-processed set. The check for pending LNs/DBs and the
462: * copying of the checkpointed files must be done atomically in a
463: * synchronized block. All methods that call this method are synchronized.
464: */
465: private void updateProcessedFiles() {
466: if (pendingLNs.isEmpty() && pendingDBs.isEmpty()) {
467: fullyProcessedFiles.addAll(checkpointedFiles);
468: checkpointedFiles.clear();
469: }
470: }
471:
472: /*
473: * Holds copy of all checkpoint-dependent cleaner state.
474: */
475: static public class CheckpointStartCleanerState {
476:
477: /* A snapshot of the cleaner collections at the checkpoint start. */
478: private Set cleanedFiles;
479: private Set fullyProcessedFiles;
480: private Set deferredWriteDbs;
481:
482: CheckpointStartCleanerState(Set cleanedFiles,
483: Set fullyProcessedFiles, List cleanedDeferredWriteDbs) {
484:
485: /*
486: * Create snapshots of the collections of various files at the
487: * beginning of the checkpoint.
488: */
489: this .cleanedFiles = new HashSet(cleanedFiles);
490: this .fullyProcessedFiles = new HashSet(fullyProcessedFiles);
491: deferredWriteDbs = new HashSet(cleanedDeferredWriteDbs);
492: }
493:
494: public boolean isEmpty() {
495: return ((cleanedFiles.size() == 0)
496: && (fullyProcessedFiles.size() == 0) && (deferredWriteDbs
497: .size() == 0));
498: }
499:
500: public Set getCleanedFiles() {
501: return cleanedFiles;
502: }
503:
504: public Set getFullyProcessedFiles() {
505: return fullyProcessedFiles;
506: }
507:
508: public Set getDeferredWriteDbs() {
509: return deferredWriteDbs;
510: }
511:
512: public int getDeferredWriteDbsSize() {
513: return deferredWriteDbs.size();
514: }
515: }
516: }
|