Skip to content

Commit

Permalink
Fixing the instant listing using completion time
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code committed Sep 20, 2024
1 parent ad4952b commit 7046d40
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -288,9 +287,9 @@ public HoodieCleanMetadata execute() {
}

private HoodieCleanMetadata getMetadataWithMaxCompletionTime(List<HoodieCleanMetadata> cleanMetadata) {
Optional<HoodieCleanMetadata> r = cleanMetadata.stream().reduce((e1, e2) ->
e1.getLastCompletedCommitTimestamp().compareTo(e2.getLastCompletedCommitTimestamp()) > 0 ? e1 : e2);
return r.orElse(null);
return cleanMetadata.stream().reduce((e1, e2) ->
e1.getLastCompletedCommitTimestamp().compareTo(e2.getLastCompletedCommitTimestamp()) > 0 ? e1 : e2)
.orElse(null);
}

private void checkIfOtherWriterCommitted(HoodieInstant hoodieInstant, HoodieIOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import java.util.stream.Collectors;

import static org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.common.util.CleanerUtils.SAVEPOINTED_TIMESTAMPS;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;

public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> {

Expand All @@ -76,7 +76,7 @@ private int getCommitsSinceLastCleaning() {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(lastCleanInstant.get()).get());
String lastCompletedCommitTimestamp = cleanMetadata.getLastCompletedCommitTimestamp();
numCommits = commitTimeline.findInstantsAfter(lastCompletedCommitTimestamp).countInstants();
numCommits = commitTimeline.findInstantsAfterCompletionTime(lastCompletedCommitTimestamp).countInstants();
} catch (IOException e) {
throw new HoodieIOException("Parsing of last clean instant " + lastCleanInstant.get() + " failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ public HoodieTimeline findInstantsAfter(String instantTime) {
.filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details);
}

@Override
public HoodieTimeline findInstantsAfterCompletionTime(String instantTime) {
return new HoodieDefaultTimeline(getInstantsAsStream()
.filter(s -> compareTimestamps(s.getCompletionTime(), GREATER_THAN, instantTime)), details);
}

@Override
public HoodieDefaultTimeline findInstantsAfterOrEquals(String commitTime, int numCommits) {
return new HoodieDefaultTimeline(getInstantsAsStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline findInstantsAfter(String instantTime);

HoodieTimeline findInstantsAfterCompletionTime(String instantTime);

/**
* Create a new Timeline with all instants before specified time.
*/
Expand Down

0 comments on commit 7046d40

Please sign in to comment.