Skip to content

Commit

Permalink
Maintain only one version pointer file, add file size limination to c…
Browse files Browse the repository at this point in the history
…ompaction strategy
  • Loading branch information
danny0405 committed Jul 31, 2023
1 parent a7f8558 commit 9e7266d
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,7 @@ private Option<String> doCompact(List<String> latestSnapshotFiles, int layer) th
if (files.size() >= archiveMergeFilesBatchSize) {
// 2. sort files by min instant time (implies ascending chronological order)
files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
List<String> candidateFiles = files.stream()
.limit(archiveMergeFilesBatchSize)
.collect(Collectors.toList());
List<String> candidateFiles = getCandidateFiles(files, archiveMergeFilesBatchSize);
String compactedFileName = compactedFileName(candidateFiles);

// 3. compaction
Expand All @@ -311,6 +309,25 @@ private Option<String> doCompact(List<String> latestSnapshotFiles, int layer) th
return Option.empty();
}

/**
* Returns at most {@code filesBatch} number of source files
* restricted by the gross file size by 1GB.
*/
private List<String> getCandidateFiles(List<String> files, int filesBatch) throws IOException {
List<String> candidates = new ArrayList<>();
long totalFileLen = 0L;
for (int i = 0; i < filesBatch; i++) {
String file = files.get(i);
if (totalFileLen > 1024 * 1024 * 1000) {
return candidates;
}
long fileLen = metaClient.getFs().getFileStatus(new Path(metaClient.getArchivePath(), file)).getLen();
totalFileLen += fileLen;
candidates.add(file);
}
return candidates;
}

/**
* Returns a new file name.
*/
Expand Down Expand Up @@ -347,18 +364,13 @@ private void clean(HoodieEngineContext context, int compactedVersions) throws IO
.flatMap(version -> HoodieArchivedTimeline.latestSnapshotFiles(metaClient, version).stream())
.collect(Collectors.toSet());
// delete the manifest file first
List<String> metaFilesToClean = new ArrayList<>();
Arrays.stream(HoodieArchivedTimeline.listAllVersionFiles(metaClient)).forEach(fileStatus -> {
if (!versionsToKeep.contains(HoodieArchivedTimeline.getSnapshotVersion(fileStatus.getPath().getName()))) {
metaFilesToClean.add(fileStatus.getPath().toString());
}
});
List<String> manifestFilesToClean = new ArrayList<>();
Arrays.stream(HoodieArchivedTimeline.listAllManifestFiles(metaClient)).forEach(fileStatus -> {
if (!versionsToKeep.contains(HoodieArchivedTimeline.getManifestVersion(fileStatus.getPath().getName()))) {
metaFilesToClean.add(fileStatus.getPath().toString());
manifestFilesToClean.add(fileStatus.getPath().toString());
}
});
deleteFilesParallelize(metaClient, metaFilesToClean, context, false);
deleteFilesParallelize(metaClient, manifestFilesToClean, context, false);
// delete the archive data files
List<String> dataFilesToClean = Arrays.stream(HoodieArchivedTimeline.listAllMetaFiles(metaClient))
.filter(fileStatus -> !filesToKeep.contains(fileStatus.getPath().getName()))
Expand Down Expand Up @@ -392,14 +404,26 @@ public void updateManifest(List<String> filesToRemove, String fileToAdd) throws
createManifestFile(newFileList, latestVersion);
}

private void createManifestFile(List<String> newFileList, int currentVersion) {
private void createManifestFile(List<String> newFileList, int currentVersion) throws IOException {
byte[] content = String.join(",", newFileList).getBytes(StandardCharsets.UTF_8);
// version starts from 1 and increases monotonically
int newVersion = currentVersion < 0 ? 1 : currentVersion + 1;
// create manifest file
FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getManifestFilePath(metaClient, newVersion), Option.of(content));
// create version file
FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getVersionFilePath(metaClient, newVersion), Option.empty());
final Path tempManifestFilePath = HoodieArchivedTimeline.getTempManifestFilePath(metaClient, newVersion);
final Path manifestFilePath = HoodieArchivedTimeline.getManifestFilePath(metaClient, newVersion);
FileIOUtils.createFileInPath(metaClient.getFs(), tempManifestFilePath, Option.of(content));
metaClient.getFs().rename(tempManifestFilePath, manifestFilePath);
// update version file
updateVersionFile(newVersion);
}

private void updateVersionFile(int newVersion) throws IOException {
byte[] content = (String.valueOf(newVersion)).getBytes(StandardCharsets.UTF_8);
final Path tempVersionFilePath = HoodieArchivedTimeline.getTempVersionFilePath(metaClient);
final Path versionFilePath = HoodieArchivedTimeline.getVersionFilePath(metaClient);
FileIOUtils.createFileInPath(metaClient.getFs(), tempVersionFilePath, Option.of(content));
metaClient.getFs().delete(versionFilePath, false);
metaClient.getFs().rename(tempVersionFilePath, versionFilePath);
}

public void compactArchiveFiles(List<String> candidateFiles, String compactedFileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exc
}

@Test
public void testCompactionWithCorruptManifestFile() throws Exception {
public void testCompactionWithCorruptVersionFile() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);

// do ingestion and trigger archive actions here.
Expand All @@ -501,16 +501,9 @@ public void testCompactionWithCorruptManifestFile() throws Exception {
archiveAndGetCommitsList(writeConfig);
}

// build a compaction archive plan with dummy content
// this plan can not be deserialized.
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
List<String> files = HoodieArchivedTimeline.latestSnapshotFiles(metaClient);
files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator());

archiver.updateManifest("dummy.parquet");
// remove the version file created by the last manifest update
metaClient.getFs().delete(HoodieArchivedTimeline.getVersionFilePath(metaClient, HoodieArchivedTimeline.latestSnapshotVersion(metaClient)));
// create a version pointer file with invalid version number.
metaClient.getFs().delete(HoodieArchivedTimeline.getVersionFilePath(metaClient));
FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getVersionFilePath(metaClient), Option.of("invalid_version".getBytes(StandardCharsets.UTF_8)));

// check that invalid manifest file will not block archived timeline loading.
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -69,10 +70,10 @@
*
* <pre>
* t111, t112 ... t120 ... ->
* \ /
* \ /
* |
* V
* \ /
* \ /
* |
* V
* t111_t120_0.parquet, t101_t110_0.parquet,... t11_t20_0.parquet L0
* \ /
* \ /
Expand All @@ -81,14 +82,14 @@
* t11_t100_1.parquet L1
*
* manifest_1, manifest_2, ... manifest_12
* | | |
* V V V
* _version_1, _version_2, ... _version_12
* |
* V
* _version_
* </pre>
*
* <p><h2>The LSM Tree Compaction</h2>
* Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer.
* We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1,
* Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a compacted file in the next layer.
* We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per file in L1,
* so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read.
*
* <p>The benchmark shows 1000 instants read cost about 10 ms.
Expand Down Expand Up @@ -125,8 +126,9 @@
* <p>This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized.
*/
public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
private static final String VERSION_FILE_PREFIX = "_version_"; // _version_[N]
private static final String VERSION_FILE_NAME = "_version_"; // _version_
private static final String MANIFEST_FILE_PREFIX = "manifest_"; // manifest_[N]
private static final String TEMP_FILE_SUFFIX = ".tmp";
public static final int FILE_LAYER_ZERO = 0;
private static final Pattern ARCHIVE_FILE_PATTERN =
Pattern.compile("^(\\d+)_(\\d+)_(\\d)\\.parquet");
Expand Down Expand Up @@ -347,16 +349,28 @@ private static boolean isFileInRange(TimeRangeFilter filter, String fileName) {
* Returns the latest snapshot version.
*/
public static int latestSnapshotVersion(HoodieTableMetaClient metaClient) throws IOException {
Path versionFilePath = getVersionFilePath(metaClient);
if (metaClient.getFs().exists(versionFilePath)) {
try {
Option<byte[]> content = FileIOUtils.readDataFromPath(metaClient.getFs(), versionFilePath);
if (content.isPresent()) {
return Integer.parseInt(new String(content.get(), StandardCharsets.UTF_8));
}
} catch (Exception e) {
// fallback to manifest file listing.
LOG.warn("Error reading version file {}", versionFilePath, e);
}
}
return allSnapshotVersions(metaClient).stream().max(Integer::compareTo).orElse(-1);
}

/**
* Returns all the valid snapshot versions.
*/
public static List<Integer> allSnapshotVersions(HoodieTableMetaClient metaClient) throws IOException {
return Arrays.stream(metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getVersionFilePathFilter()))
return Arrays.stream(metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getManifestFilePathFilter()))
.map(fileStatus -> fileStatus.getPath().getName())
.map(HoodieArchivedTimeline::getSnapshotVersion)
.map(HoodieArchivedTimeline::getManifestVersion)
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -388,18 +402,27 @@ public static Path getManifestFilePath(HoodieTableMetaClient metaClient, int sna
return new Path(metaClient.getArchivePath(), MANIFEST_FILE_PREFIX + snapshotVersion);
}

public static Path getTempManifestFilePath(HoodieTableMetaClient metaClient, int snapshotVersion) throws IOException {
Path path = new Path(metaClient.getArchivePath(), MANIFEST_FILE_PREFIX + snapshotVersion + TEMP_FILE_SUFFIX);
if (metaClient.getFs().exists(path)) {
metaClient.getFs().delete(path);
}
return path;
}

/**
* Returns the full version file path with given version number.
*/
public static Path getVersionFilePath(HoodieTableMetaClient metaClient, int snapshotVersion) {
return new Path(metaClient.getArchivePath(), VERSION_FILE_PREFIX + snapshotVersion);
public static Path getVersionFilePath(HoodieTableMetaClient metaClient) {
return new Path(metaClient.getArchivePath(), VERSION_FILE_NAME);
}

/**
* List all the version files.
*/
public static FileStatus[] listAllVersionFiles(HoodieTableMetaClient metaClient) throws IOException {
return metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getVersionFilePathFilter());
public static Path getTempVersionFilePath(HoodieTableMetaClient metaClient) throws IOException {
Path path = new Path(metaClient.getArchivePath(), VERSION_FILE_NAME + TEMP_FILE_SUFFIX);
if (metaClient.getFs().exists(path)) {
metaClient.getFs().delete(path);
}
return path;
}

/**
Expand All @@ -417,13 +440,6 @@ public static FileStatus[] listAllMetaFiles(HoodieTableMetaClient metaClient) th
new Path(metaClient.getArchivePath() + "/*.parquet"));
}

/**
* Parse the snapshot version from the version file name.
*/
public static int getSnapshotVersion(String fileName) {
return Integer.parseInt(fileName.split("_")[2]);
}

/**
* Parse the snapshot version from the manifest file name.
*/
Expand Down Expand Up @@ -480,18 +496,11 @@ public static boolean isFileFromLayer(String fileName, int layer) {
return getFileLayer(fileName) == layer;
}

/**
* Returns a path filter for the version pointer files.
*/
public static PathFilter getVersionFilePathFilter() {
return path -> path.getName().startsWith(VERSION_FILE_PREFIX);
}

/**
* Returns a path filter for the manifest files.
*/
public static PathFilter getManifestFilePathFilter() {
return path -> path.getName().startsWith(MANIFEST_FILE_PREFIX);
return path -> path.getName().startsWith(MANIFEST_FILE_PREFIX) && !path.getName().endsWith(TEMP_FILE_SUFFIX);
}

// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase {
}

val benchmark = new HoodieBenchmark("pref load archived instants", commitsNum, 3)
benchmark.addCase("read shim instants") { _ =>
benchmark.addCase("read slim instants") { _ =>
new HoodieArchivedTimeline(metaClient)
}
benchmark.addCase("read instants with commit metadata") { _ =>
Expand Down

0 comments on commit 9e7266d

Please sign in to comment.