Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8077] Improve logging in clean planning #11979

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,16 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
LOG.info("Nothing to clean here. It is already clean");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
LOG.info("Earliest commit to retain for clean : " + (earliestInstant.isPresent() ? earliestInstant.get().getTimestamp() : "null"));
LOG.info("Total partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
LOG.info(
"Earliest commit to retain for clean : {}",
earliestInstant.isPresent() ? earliestInstant.get().getTimestamp() : "null");
LOG.info(
"Total partitions to clean : {}, with policy {}",
partitionsToClean.size(),
config.getCleanerPolicy());
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
LOG.info(
"Using cleanerParallelism: {}", cleanerParallelism);

context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());

Expand Down Expand Up @@ -145,11 +151,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
.collect(Collectors.toList()));
}

return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
planner.getLastCompletedCommitTimestamp(),
config.getCleanerPolicy().name(), Collections.emptyMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete, prepareExtraMetadata(planner.getSavepointedTimestamps()));
return new HoodieCleanerPlan(
earliestInstant.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
planner.getLastCompletedCommitTimestamp(), // Note: This is the start time of the last completed ingestion before this clean.
config.getCleanerPolicy().name(),
Collections.emptyMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
cleanOps,
partitionsToDelete,
prepareExtraMetadata(planner.getSavepointedTimestamps()));
} catch (IOException e) {
throw new HoodieIOException("Failed to schedule clean operation", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> in
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)
&& !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()
&& !hoodieTable.getActiveTimeline().getCommitsTimeline().isBeforeTimelineStarts(cleanMetadata.getEarliestCommitToRetain())) {
return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
}
Expand All @@ -204,9 +204,11 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
LOG.info("Since savepoints have been removed compared to previous clean, triggering clean planning for all partitions");
return getPartitionPathsForFullCleaning();
} else {
LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
LOG.info(
"Incremental Cleaning mode is enabled. Looking up partition-paths that have changed "
+ "since last clean at {}. New Instant to retain {}.",
cleanMetadata.getEarliestCommitToRetain(),
newInstantToRetain);

return hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream()
.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
Expand Down Expand Up @@ -288,8 +290,11 @@ private boolean isFileSliceExistInSavepointedFiles(FileSlice fs, List<String> sa
* single file (i.e., run it with versionsRetained = 1)
*/
private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
+ " file versions. ");
LOG.info(
"Cleaning {}, retaining latest {} file versions.",
partitionPath,
config.getCleanerFileVersionsRetained());

List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
Expand Down
Loading