Skip to content

Commit

Permalink
Parallelize file version deletes during clean and related tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kaushikd49 authored and prazanna committed Jun 16, 2017
1 parent dda28c0 commit 521555c
Show file tree
Hide file tree
Showing 7 changed files with 459 additions and 299 deletions.
19 changes: 2 additions & 17 deletions hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.func.BulkInsertMapFunction;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.HoodieCleaner;
import com.uber.hoodie.io.HoodieCommitArchiveLog;
import com.uber.hoodie.metrics.HoodieMetrics;
import com.uber.hoodie.table.HoodieTable;
Expand Down Expand Up @@ -738,25 +737,11 @@ private void clean(String startCleanTime) throws HoodieIOException {
HoodieTable<T> table = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);

List<String> partitionsToClean =
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
// shuffle to distribute cleaning work across partitions evenly
Collections.shuffle(partitionsToClean);
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config
.getCleanerPolicy());
if (partitionsToClean.isEmpty()) {
logger.info("Nothing to clean here mom. It is already clean");
List<HoodieCleanStat> cleanStats = table.clean(jsc);
if (cleanStats.isEmpty()) {
return;
}

int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
List<HoodieCleanStat> cleanStats = jsc.parallelize(partitionsToClean, cleanerParallelism)
.map((Function<String, HoodieCleanStat>) partitionPathToClean -> {
HoodieCleaner cleaner = new HoodieCleaner(table, config);
return cleaner.clean(partitionPathToClean);
})
.collect();

// Emit metrics (duration, numFilesDeleted) if needed
Optional<Long> durationInMs = Optional.empty();
if (context != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package com.uber.hoodie.io;

import com.clearspring.analytics.util.Lists;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieRecordPayload;
Expand All @@ -30,22 +27,16 @@
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.FlatMapFunction;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Cleaner is responsible for garbage collecting older files in a given partition path, such that
Expand All @@ -56,16 +47,16 @@
* <p>
* TODO: Should all cleaning be done based on {@link com.uber.hoodie.common.model.HoodieCommitMetadata}
*/
public class HoodieCleaner<T extends HoodieRecordPayload<T>> {
private static Logger logger = LogManager.getLogger(HoodieCleaner.class);
public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
private static Logger logger = LogManager.getLogger(HoodieCleanHelper.class);

private final TableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline;
private HoodieTable<T> hoodieTable;
private HoodieWriteConfig config;
private FileSystem fs;

public HoodieCleaner(HoodieTable<T> hoodieTable, HoodieWriteConfig config) {
public HoodieCleanHelper(HoodieTable<T> hoodieTable, HoodieWriteConfig config) {
this.hoodieTable = hoodieTable;
this.fileSystemView = hoodieTable.getCompactedFileSystemView();
this.commitTimeline = hoodieTable.getCompletedCommitTimeline();
Expand Down Expand Up @@ -110,11 +101,11 @@ private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath)
// Delete the remaining files
while (commitItr.hasNext()) {
HoodieDataFile nextRecord = commitItr.next();
deletePaths.add(String.format("%s/%s/%s", config.getBasePath(), partitionPath,
nextRecord.getFileName()));
deletePaths.add(nextRecord.getFileStatus().getPath().toString());
if (hoodieTable.getMetaClient().getTableType()
== HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
// todo: fix below for MERGE_ON_READ
deletePaths.add(String
.format("%s/%s/%s", config.getBasePath(), partitionPath,
FSUtils.maskWithoutLogVersion(nextRecord.getCommitTime(),
Expand Down Expand Up @@ -158,8 +149,7 @@ private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath)

// determine if we have enough commits, to start cleaning.
if (commitTimeline.countInstants() > commitsRetained) {
HoodieInstant earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).get();
HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get();
List<List<HoodieDataFile>> fileVersions =
fileSystemView.getEveryVersionInPartition(partitionPath)
.collect(Collectors.toList());
Expand Down Expand Up @@ -192,12 +182,11 @@ private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath)
fileCommitTime,
HoodieTimeline.GREATER)) {
// this is a commit, that should be cleaned.
deletePaths.add(String
.format("%s/%s/%s", config.getBasePath(), partitionPath, FSUtils
.maskWithoutTaskPartitionId(fileCommitTime, afile.getFileId())));
deletePaths.add(afile.getFileStatus().getPath().toString());
if (hoodieTable.getMetaClient().getTableType()
== HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
// todo: fix below for MERGE_ON_READ
deletePaths.add(String
.format("%s/%s/%s", config.getBasePath(), partitionPath,
FSUtils.maskWithoutLogVersion(fileCommitTime, afile.getFileId(),
Expand Down Expand Up @@ -228,49 +217,36 @@ private String getLatestVersionBeforeCommit(List<HoodieDataFile> fileList,
return null;
}


/**
* Performs cleaning of the partition path according to cleaning policy and returns the number
* of files cleaned.
*
* @throws IllegalArgumentException if unknown cleaning policy is provided
* Returns files to be cleaned for the given partitionPath based on cleaning policy.
*/
public HoodieCleanStat clean(String partitionPath) throws IOException {
public List<String> getDeletePaths(String partitionPath) throws IOException {
HoodieCleaningPolicy policy = config.getCleanerPolicy();
List<String> deletePaths;
Optional<HoodieInstant> earliestCommitToRetain = Optional.empty();
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
int commitsRetained = config.getCleanerCommitsRetained();
if (commitTimeline.countInstants() > commitsRetained) {
earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
}
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
} else {
throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
}
logger.info(
deletePaths.size() + " patterns used to delete in partition path:" + partitionPath);

// perform the actual deletes
Map<FileStatus, Boolean> deletedFiles = Maps.newHashMap();
for (String deletePath : deletePaths) {
logger.info("Working on delete path :" + deletePath);
FileStatus[] deleteVersions = fs.globStatus(new Path(deletePath));
if (deleteVersions != null) {
for (FileStatus deleteVersion : deleteVersions) {
boolean deleteResult = fs.delete(deleteVersion.getPath(), false);
deletedFiles.put(deleteVersion, deleteResult);
if (deleteResult) {
logger.info("Cleaned file at path :" + deleteVersion.getPath());
}
}
}
}
return deletePaths;
}

logger.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath);
return HoodieCleanStat.newBuilder().withPolicy(policy).withDeletePathPattern(deletePaths)
.withPartitionPath(partitionPath).withEarliestCommitRetained(earliestCommitToRetain)
.withDeletedFileResults(deletedFiles).build();
/**
* Returns earliest commit to retain based on cleaning policy.
*/
public Optional<HoodieInstant> getEarliestCommitToRetain() {
Optional<HoodieInstant> earliestCommitToRetain = Optional.empty();
int commitsRetained = config.getCleanerCommitsRetained();
if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
&& commitTimeline.countInstants() > commitsRetained) {
earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
}
return earliestCommitToRetain;
}
}
Loading

0 comments on commit 521555c

Please sign in to comment.