Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code committed Sep 20, 2024
1 parent 7046d40 commit b609b29
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,16 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
LOG.info("Nothing to clean here. It is already clean");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
LOG.info("Earliest commit to retain for clean : " + (earliestInstant.isPresent() ? earliestInstant.get().getTimestamp() : "null"));
LOG.info("Total partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
LOG.info(
"Earliest commit to retain for clean : {}",
earliestInstant.isPresent() ? earliestInstant.get().getTimestamp() : "null");
LOG.info(
"Total partitions to clean : {}, with policy {}",
partitionsToClean.size(),
config.getCleanerPolicy());
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
LOG.info(
"Using cleanerParallelism: {}", cleanerParallelism);

context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());

Expand Down Expand Up @@ -145,11 +151,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
.collect(Collectors.toList()));
}

return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
planner.getLastCompletedCommitTimestamp(),
config.getCleanerPolicy().name(), Collections.emptyMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete, prepareExtraMetadata(planner.getSavepointedTimestamps()));
return new HoodieCleanerPlan(
earliestInstant.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
planner.getLastCompletedCommitTimestamp(), // Note: This is the start time of the last completed ingestion before this clean.
config.getCleanerPolicy().name(),
Collections.emptyMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
cleanOps,
partitionsToDelete,
prepareExtraMetadata(planner.getSavepointedTimestamps()));
} catch (IOException e) {
throw new HoodieIOException("Failed to schedule clean operation", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> in
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)
&& !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()
&& !hoodieTable.getActiveTimeline().getCommitsTimeline().isBeforeTimelineStarts(cleanMetadata.getEarliestCommitToRetain())) {
return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
}
Expand All @@ -204,9 +204,11 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
LOG.info("Since savepoints have been removed compared to previous clean, triggering clean planning for all partitions");
return getPartitionPathsForFullCleaning();
} else {
LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
LOG.info(
"Incremental Cleaning mode is enabled. Looking up partition-paths that have changed "
+ "since last clean at {}. New Instant to retain {}.",
cleanMetadata.getEarliestCommitToRetain(),
newInstantToRetain);

return hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream()
.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
Expand Down Expand Up @@ -288,8 +290,11 @@ private boolean isFileSliceExistInSavepointedFiles(FileSlice fs, List<String> sa
* single file (i.e., run it with versionsRetained = 1)
*/
private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
+ " file versions. ");
LOG.info(
"Cleaning {}, retaining latest {} file versions.",
partitionPath,
config.getCleanerFileVersionsRetained());

List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
Expand Down

0 comments on commit b609b29

Please sign in to comment.