From e88d7dc94591bed148842a95f695833aec8e727c Mon Sep 17 00:00:00 2001 From: danny0405 Date: Mon, 17 Jul 2023 19:50:11 +0800 Subject: [PATCH] Add snapshot isolation for Archived timeline --- .../hudi/client/HoodieTimelineArchiver.java | 197 +++++++++--------- .../hudi/client/utils/InstantTriple.java | 10 +- .../client/utils/MetadataConversionUtils.java | 11 +- .../org/apache/hudi/DummyInstantTriple.java | 28 +++ .../hudi/io/TestHoodieTimelineArchiver.java | 177 ++-------------- .../timeline/HoodieArchivedTimeline.java | 114 +++++++--- .../ArchivedTimelineReadBenchmark.scala | 43 ++-- 7 files changed, 257 insertions(+), 323 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyInstantTriple.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index f288e8ecc8d00..5de9c882a66eb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -20,13 +20,11 @@ package org.apache.hudi.client; import org.apache.hudi.avro.model.HoodieArchivedMetaEntryV2; -import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.InstantTriple; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieCleaningPolicy; @@ -38,7 +36,6 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.ClusteringUtils; @@ -72,6 +69,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.time.Instant; import java.util.ArrayList; @@ -87,9 +85,6 @@ import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.NOT_PARSABLE_TIMESTAMPS; import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.parseDateFromInstantTime; -import static org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.FILE_LAYER_ONE; -import static org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.getMaxInstantTime; -import static org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.getMinInstantTime; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; @@ -206,7 +201,7 @@ private HoodieFileWriter openWriter(Path filePath) { return HoodieFileWriterFactory.getFileWriter("", filePath, metaClient.getHadoopConf(), getOrCreateWriterConfig(), HoodieArchivedMetaEntryV2.getClassSchema(), table.getTaskContextSupplier(), HoodieRecordType.AVRO); } catch (IOException e) { - throw new HoodieException("Unable to initialize HoodieLogFormat writer", e); + throw new HoodieException("Unable to initialize archiving writer", e); } } @@ -225,7 +220,6 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc } // Sort again because the cleaning and rollback instants could break the sequence. List instantsToArchive = getInstantsToArchive().sorted().collect(Collectors.toList()); - verifyPartialCompaction(context); boolean success = true; if (!instantsToArchive.isEmpty()) { LOG.info("Archiving instants " + instantsToArchive); @@ -284,50 +278,40 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc */ @VisibleForTesting public void compactAndClean(HoodieEngineContext context) throws IOException { - Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME); // List all archive files in L0 - FileStatus[] fsStatuses = metaClient.getFs().listStatus( - new Path(metaClient.getArchivePath()), HoodieArchivedTimeline.getLayerZeroPathFilter()); + List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient); + int layer = 0; + Option compactedFileName = doCompact(latestSnapshotFiles, layer); + while (compactedFileName.isPresent()) { + latestSnapshotFiles.add(compactedFileName.get()); + compactedFileName = doCompact(latestSnapshotFiles, ++layer); + } + + // cleaning + clean(context, layer + 1); + } + + private Option doCompact(List latestSnapshotFiles, int layer) throws IOException { + List files = latestSnapshotFiles + .stream().filter(file -> HoodieArchivedTimeline.isFileFromLayer(file, layer)).collect(Collectors.toList()); int archiveMergeFilesBatchSize = config.getArchiveMergeFilesBatchSize(); - if (fsStatuses.length >= archiveMergeFilesBatchSize) { + if (files.size() >= archiveMergeFilesBatchSize) { // Sort files by min instant time (implies ascending chronological order) - Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); - // before merge archive files build merge plan - List candidateFiles = Arrays.stream(fsStatuses) - .limit(archiveMergeFilesBatchSize) - .collect(Collectors.toList()); - + files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); + List candidateFiles = files.stream() + .limit(archiveMergeFilesBatchSize) + .collect(Collectors.toList()); String compactedFileName = compactedFileName(candidateFiles); - List candidateFilePaths = candidateFiles.stream() - .map(fileStatus -> fileStatus.getPath().toString()) - .collect(Collectors.toList()); - - buildArchiveMergePlan(candidateFilePaths, planPath, compactedFileName); // compact archive files compactArchiveFiles(candidateFiles, compactedFileName); - // after compaction, delete the small archive files. - deleteFilesParallelize(metaClient, candidateFilePaths, context, true); - LOG.info("Success to delete replaced small archive files."); - // finally, delete archiveMergePlan which means merging small archive files operation is successful. - metaClient.getFs().delete(planPath, false); - LOG.info("Success to merge small archive files."); - - // cleaning - FileStatus[] layerOneFileStatuses = metaClient.getFs().listStatus( - new Path(metaClient.getArchivePath()), HoodieArchivedTimeline.getLayerOnePathFilter()); - long currentTimeMillis = System.currentTimeMillis(); - List filesToPurge = Arrays.stream(layerOneFileStatuses) - .filter(fileStatus -> isOverdueFile(currentTimeMillis, fileStatus)) - .map(fileStatus -> fileStatus.getPath().toString()) - .collect(Collectors.toList()); - if (filesToPurge.size() > 0) { - deleteFilesParallelize(metaClient, filesToPurge, context, true); - LOG.info("Success to clean history archive files: " + filesToPurge); - } + updateManifest(candidateFiles, compactedFileName); + LOG.info("Finishes compaction of archive files: " + candidateFiles); + return Option.of(compactedFileName); } + return Option.empty(); } /** @@ -348,12 +332,13 @@ private static String newFileName(String minInstant, String maxInstant, int laye * Returns a new file name. */ @VisibleForTesting - public static String compactedFileName(List files) { - String minInstant = files.stream().map(fileStatus -> getMinInstantTime(fileStatus.getPath().getName())) + public static String compactedFileName(List files) { + String minInstant = files.stream().map(HoodieArchivedTimeline::getMinInstantTime) .min(Comparator.naturalOrder()).get(); - String maxInstant = files.stream().map(fileStatus -> getMaxInstantTime(fileStatus.getPath().getName())) + String maxInstant = files.stream().map(HoodieArchivedTimeline::getMaxInstantTime) .max(Comparator.naturalOrder()).get(); - return newFileName(minInstant, maxInstant, FILE_LAYER_ONE); + int currentLayer = HoodieArchivedTimeline.getFileLayer(files.get(0)); + return newFileName(minInstant, maxInstant, currentLayer + 1); } /** @@ -361,76 +346,79 @@ public static String compactedFileName(List files) { * * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. */ - private void verifyPartialCompaction(HoodieEngineContext context) throws IOException { - Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME); - HoodieWrapperFileSystem fs = metaClient.getFs(); - // If plan exist, last merge small archive files was failed. - // we need to revert or complete last action. - if (fs.exists(planPath)) { - HoodieMergeArchiveFilePlan plan; - try { - plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fs, planPath).get(), HoodieMergeArchiveFilePlan.class); - } catch (IOException e) { - LOG.warn("Parsing merge archive plan failed.", e); - // Reading partial plan file which means last merge action is failed during writing plan file. - fs.delete(planPath); - return; - } - Path compactedArchiveFile = new Path(metaClient.getArchivePath(), plan.getMergedArchiveFileName()); - List candidates = plan.getCandidate().stream().map(Path::new).collect(Collectors.toList()); - if (candidateAllExists(candidates)) { - // Last merge action is failed during writing merged archive file. - // But all the small archive files are not deleted. - // Revert last action by deleting mergedArchiveFile if existed. - if (fs.exists(compactedArchiveFile)) { - fs.delete(compactedArchiveFile, false); + private void clean(HoodieEngineContext context, int compactedVersions) throws IOException { + // if there are more than 3 version of snapshots, clean the oldest files. + List allSnapshotVersions = HoodieArchivedTimeline.allSnapshotVersions(metaClient); + int numVersionsToKeep = 3 + compactedVersions; // should make the threshold configurable. + if (allSnapshotVersions.size() > numVersionsToKeep) { + allSnapshotVersions.sort((v1, v2) -> v2 - v1); + List versionsToKeep = allSnapshotVersions.subList(0, numVersionsToKeep); + Set filesToKeep = versionsToKeep.stream() + .flatMap(version -> HoodieArchivedTimeline.latestSnapshotFiles(metaClient, version).stream()) + .collect(Collectors.toSet()); + // delete the manifest file first + List metaFilesToClean = new ArrayList<>(); + Arrays.stream(HoodieArchivedTimeline.listAllVersionFiles(metaClient)).forEach(fileStatus -> { + if (!versionsToKeep.contains(HoodieArchivedTimeline.getSnapshotVersion(fileStatus.getPath().getName()))) { + metaFilesToClean.add(fileStatus.getPath().toString()); } - } else { - // Last merge action is failed during deleting small archive files. - // But the merged files is completed. - // Try to complete last action - if (fs.exists(compactedArchiveFile)) { - deleteFilesParallelize(metaClient, plan.getCandidate(), context, true); + }); + Arrays.stream(HoodieArchivedTimeline.listAllManifestFiles(metaClient)).forEach(fileStatus -> { + if (!versionsToKeep.contains(HoodieArchivedTimeline.getManifestVersion(fileStatus.getPath().getName()))) { + metaFilesToClean.add(fileStatus.getPath().toString()); } - } - - fs.delete(planPath); + }); + deleteFilesParallelize(metaClient, metaFilesToClean, context, false); + // delete the archive data files + List dataFilesToClean = Arrays.stream(HoodieArchivedTimeline.listAllDataFiles(metaClient)) + .filter(fileStatus -> !filesToKeep.contains(fileStatus.getPath().getName())) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + deleteFilesParallelize(metaClient, dataFilesToClean, context, false); } } - /** - * If all the candidate small archive files existed, last merge operation was failed during writing the merged archive file. - * If at least one of candidate small archive files existed, the merged archive file was created and last operation was failed during deleting the small archive files. - */ - private boolean candidateAllExists(List candidates) throws IOException { - for (Path archiveFile : candidates) { - if (!metaClient.getFs().exists(archiveFile)) { - // candidate is deleted - return false; - } - } - return true; + public void updateManifest(String fileToAdd) throws IOException { + // 1. read the latest manifest version file; + // 2. read the latest manifest file for valid files; + // 3. add this new file to the existing file list from step2. + int latestVersion = HoodieArchivedTimeline.latestSnapshotVersion(metaClient); + List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient, latestVersion); + List newFileList = new ArrayList<>(latestSnapshotFiles); + newFileList.add(fileToAdd); + createManifestFile(newFileList, latestVersion); + } + + public void updateManifest(List filesToRemove, String fileToAdd) throws IOException { + // 1. read the latest manifest version file; + // 2. read the latest manifest file for valid files; + // 3. remove files to the existing file list from step2; + // 4. add this new file to the existing file list from step2. + int latestVersion = HoodieArchivedTimeline.latestSnapshotVersion(metaClient); + List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient, latestVersion); + List newFileList = new ArrayList<>(latestSnapshotFiles); + newFileList.removeAll(filesToRemove); + newFileList.add(fileToAdd); + createManifestFile(newFileList, latestVersion); } - public void buildArchiveMergePlan(List compactCandidates, Path planPath, String compactedFileName) throws IOException { - LOG.info("Start to build archive merge plan."); - HoodieMergeArchiveFilePlan plan = HoodieMergeArchiveFilePlan.newBuilder() - .setCandidate(compactCandidates) - .setMergedArchiveFileName(compactedFileName) - .build(); - Option content = TimelineMetadataUtils.serializeAvroMetadata(plan, HoodieMergeArchiveFilePlan.class); - // building merge archive files plan. - FileIOUtils.createFileInPath(metaClient.getFs(), planPath, content); - LOG.info("Success to build archive merge plan."); + private void createManifestFile(List newFileList, int currentVersion) { + byte[] content = String.join(",", newFileList).getBytes(StandardCharsets.UTF_8); + // version starts from 1 and increases monotonically + int newVersion = currentVersion < 0 ? 1 : currentVersion + 1; + // create manifest file + FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getManifestFilePath(metaClient, newVersion), Option.of(content)); + // create version file + FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getVersionFilePath(metaClient, newVersion), Option.empty()); } - public void compactArchiveFiles(List candidateFiles, String compactedFileName) { + public void compactArchiveFiles(List candidateFiles, String compactedFileName) { LOG.info("Starting to merge small archive files."); try (HoodieFileWriter writer = openWriter(new Path(metaClient.getArchivePath(), compactedFileName))) { - for (FileStatus fileStatus : candidateFiles) { + for (String fileName : candidateFiles) { // Read the archived file try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) - .getFileReader(metaClient.getHadoopConf(), fileStatus.getPath())) { + .getFileReader(metaClient.getHadoopConf(), new Path(metaClient.getArchivePath(), fileName))) { // Read the meta entry try (ClosableIterator iterator = reader.getIndexedRecordIterator(HoodieArchivedMetaEntryV2.getClassSchema(), HoodieArchivedMetaEntryV2.getClassSchema())) { while (iterator.hasNext()) { @@ -696,7 +684,7 @@ public void archive(HoodieEngineContext context, List instants) t try { deleteAnyLeftOverMarkers(context, triple); // in local FS and HDFS, there could be empty completed instants due to crash. - final HoodieArchivedMetaEntryV2 metaEntry = MetadataConversionUtils.createArchivedMetaEntry(triple, metaClient, !table.getActiveTimeline().isEmpty(triple.getCompleted())); + final HoodieArchivedMetaEntryV2 metaEntry = MetadataConversionUtils.createArchivedMetaEntry(triple, metaClient); writer.write(metaEntry.getInstantTime(), new HoodieAvroIndexedRecord(metaEntry), wrapperSchema); } catch (Exception e) { LOG.error("Failed to archive instant: " + triple.getInstantTime(), e); @@ -705,6 +693,7 @@ public void archive(HoodieEngineContext context, List instants) t } } } + updateManifest(filePath.getName()); } catch (Exception e) { throw new HoodieCommitException("Failed to archive commits", e); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/InstantTriple.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/InstantTriple.java index d93d596a785d1..9fc99dabeaf65 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/InstantTriple.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/InstantTriple.java @@ -42,7 +42,7 @@ public class InstantTriple implements Serializable, Comparable { /** * The constructor. */ - private InstantTriple(@Nullable HoodieInstant requested, @Nullable HoodieInstant inflight, HoodieInstant completed) { + protected InstantTriple(@Nullable HoodieInstant requested, @Nullable HoodieInstant inflight, HoodieInstant completed) { this.requested = requested; this.inflight = inflight; this.completed = completed; @@ -91,8 +91,12 @@ public String getCompletionTime() { return this.completed.getStateTransitionTime(); } - public byte[] getCommitMetadata(HoodieTableMetaClient metaClient) { - return metaClient.getActiveTimeline().getInstantDetails(this.completed).get(); + public Option getCommitMetadata(HoodieTableMetaClient metaClient) { + Option content = metaClient.getActiveTimeline().getInstantDetails(this.completed); + if (content.isPresent() && content.get().length == 0) { + return Option.empty(); + } + return content; } public Option getRequestedCommitMetadata(HoodieTableMetaClient metaClient) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java index 9960b87170978..561e4a32cc27f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java @@ -136,14 +136,12 @@ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInst return archivedMetaWrapper; } - public static HoodieArchivedMetaEntryV2 createArchivedMetaEntry(InstantTriple triple, HoodieTableMetaClient metaClient, boolean validCommit) { + public static HoodieArchivedMetaEntryV2 createArchivedMetaEntry(InstantTriple triple, HoodieTableMetaClient metaClient) { HoodieArchivedMetaEntryV2 metaEntryV2 = new HoodieArchivedMetaEntryV2(); metaEntryV2.setInstantTime(triple.getInstantTime()); metaEntryV2.setCompletionTime(triple.getCompletionTime()); metaEntryV2.setAction(triple.getAction()); - if (validCommit) { - metaEntryV2.setMetadata(ByteBuffer.wrap(triple.getCommitMetadata(metaClient))); - } + triple.getCommitMetadata(metaClient).ifPresent(commitMetadata -> metaEntryV2.setMetadata(ByteBuffer.wrap(commitMetadata))); switch (triple.getAction()) { case HoodieTimeline.CLEAN_ACTION: { metaEntryV2.setPlan(ByteBuffer.wrap(triple.getCleanPlan(metaClient))); @@ -152,10 +150,7 @@ public static HoodieArchivedMetaEntryV2 createArchivedMetaEntry(InstantTriple tr case HoodieTimeline.REPLACE_COMMIT_ACTION: { // we may have cases with empty HoodieRequestedReplaceMetadata e.g. insert_overwrite_table or insert_overwrite // without clustering. However, we should revisit the requested commit file standardization - Option requestedReplaceMetadata = triple.getRequestedCommitMetadata(metaClient); - if (requestedReplaceMetadata.isPresent()) { - metaEntryV2.setPlan(ByteBuffer.wrap(requestedReplaceMetadata.get())); - } + triple.getRequestedCommitMetadata(metaClient).ifPresent(metadata -> metaEntryV2.setPlan(ByteBuffer.wrap(metadata))); break; } case HoodieTimeline.COMPACTION_ACTION: { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyInstantTriple.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyInstantTriple.java new file mode 100644 index 0000000000000..023c24e4ba6d8 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyInstantTriple.java @@ -0,0 +1,28 @@ +package org.apache.hudi; + +import org.apache.hudi.client.utils.InstantTriple; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; + +/** + * Instant triple for testing. + */ +public class DummyInstantTriple extends InstantTriple { + private final byte[] commitMetadata; + + /** + * Only for testing purpose. + */ + public DummyInstantTriple(HoodieInstant completed, byte[] commitMetadata) { + super(new HoodieInstant(HoodieInstant.State.REQUESTED, completed.getAction(), completed.getTimestamp()), + new HoodieInstant(HoodieInstant.State.INFLIGHT, completed.getAction(), completed.getTimestamp()), + completed); + this.commitMetadata = commitMetadata; + } + + @Override + public Option getCommitMetadata(HoodieTableMetaClient metaClient) { + return Option.of(this.commitMetadata); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 7dd56a5e09ee1..e08fdfeec0b61 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -61,7 +61,6 @@ import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Disabled; @@ -104,9 +103,9 @@ import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable; import static org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { @@ -492,7 +491,7 @@ public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exc } @Test - public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed() throws Exception { + public void testCompactionWithCorruptManifestFile() throws Exception { HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3); // do ingestion and trigger archive actions here. @@ -505,23 +504,19 @@ public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed() throws Except // this plan can not be deserialized. HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); - FileStatus[] fsStatuses = metaClient.getFs().globStatus( - new Path(metaClient.getArchivePath() + "/*.parquet")); - Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); - List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); - - Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME); - archiver.buildArchiveMergePlan(candidateFiles, plan, ".commits_.archive.3_1-0-1"); - String s = "Dummy Content"; - // stain the current merge plan file. - FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes())); - - // check that damaged plan file will not block archived timeline loading. + List files = HoodieArchivedTimeline.latestSnapshotFiles(metaClient); + files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); + + archiver.updateManifest("dummy.parquet"); + // remove the version file created by the last manifest update + metaClient.getFs().delete(HoodieArchivedTimeline.getVersionFilePath(metaClient, HoodieArchivedTimeline.latestSnapshotVersion(metaClient))); + + // check that invalid manifest file will not block archived timeline loading. HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload(); assertEquals(5 * 3 + 4, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants()); - // trigger several archive after left damaged merge small archive file plan. + // trigger several archive with the invalid manifest file. for (int i = 1; i < 10; i++) { testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); archiveAndGetCommitsList(writeConfig); @@ -534,15 +529,15 @@ public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed() throws Except // check instant number assertEquals(4 * 3 + 14, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants()); - // if there are damaged archive files and damaged plan, hoodie need throw ioe while loading archived timeline. + // if there are damaged archive files and damaged plan, hoodie can still load correctly. Path damagedFile = new Path(metaClient.getArchivePath(), "300_301_1.parquet"); - FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes())); + FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of("dummy".getBytes())); - assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); + assertDoesNotThrow(() -> metaClient.getArchivedTimeline().reload(), "Archived timeline can skip the invalid data and manifest files smartly"); } @Test - public void testMergeSmallArchiveFilesRecoverFromMergeFailed() throws Exception { + public void testCompactionRecoverWithoutManifestFile() throws Exception { HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3); // do ingestion and trigger archive actions here. @@ -554,122 +549,36 @@ public void testMergeSmallArchiveFilesRecoverFromMergeFailed() throws Exception // do a single merge small archive files HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); - FileStatus[] fsStatuses = metaClient.getFs().listStatus( - new Path(metaClient.getArchivePath()), HoodieArchivedTimeline.getLayerZeroPathFilter()); - Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); - List candidateFiles = Arrays.stream(fsStatuses).collect(Collectors.toList()); - List candidateFilePaths = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + List candidateFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient); + candidateFiles.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); String compactedFileName = HoodieTimelineArchiver.compactedFileName(candidateFiles); - archiver.buildArchiveMergePlan(candidateFilePaths, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), compactedFileName); archiver.compactArchiveFiles(candidateFiles, compactedFileName); // check loading archived and active timeline success HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload(); assertEquals(5 * 3 + 4, rawActiveTimeline.countInstants() + archivedTimeLine.reload().countInstants()); - - String s = "Dummy Content"; - // stain the current merged archive file. - Path compactedFilePath = new Path(metaClient.getArchivePath(), compactedFileName); - metaClient.getFs().delete(compactedFilePath); - FileIOUtils.createFileInPath(metaClient.getFs(), compactedFilePath, Option.of(s.getBytes())); - - // do another archive actions with compaction triggered. - for (int i = 1; i < 10; i++) { - testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); - archiveAndGetCommitsList(writeConfig); - } - - // check result. - // we need to load archived timeline successfully and ignore the parsing damage merged archive files exception. - HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false); - HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload(); - - assertEquals(14 + 4 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants()); - - // if there is any corrupt compaction archive file and other regular corrupt archive file. - // hoodie need throw ioe while loading archived timeline because of parsing the corrupt archive file. - Path corruptFile = new Path(metaClient.getArchivePath(), "300_301_1.parquet"); - FileIOUtils.createFileInPath(metaClient.getFs(), corruptFile, Option.of(s.getBytes())); - - assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); } @Test - public void testMergeSmallArchiveFilesRecoverFromDeleteFailed() throws Exception { + public void testCompactionCleaning() throws Exception { HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3); // do ingestion and trigger archive actions here. - for (int i = 1; i < 10; i++) { + for (int i = 1; i < 19; i++) { testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); archiveAndGetCommitsList(writeConfig); } - - // do a single merge small archive files - HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); - FileStatus[] fsStatuses = metaClient.getFs().listStatus( - new Path(metaClient.getArchivePath()), HoodieArchivedTimeline.getLayerZeroPathFilter()); - Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); - List candidateFiles = Arrays.stream(fsStatuses).collect(Collectors.toList()); - List candidateFilePaths = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); - - String compactedFileName = HoodieTimelineArchiver.compactedFileName(candidateFiles); - archiver.buildArchiveMergePlan(candidateFilePaths, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), compactedFileName); - archiver.compactArchiveFiles(candidateFiles, compactedFileName); - - // delete only one of the small archive file to simulate delete action failed. - metaClient.getFs().delete(fsStatuses[0].getPath()); + // now we have version 6, 7, 8, 9 version of snapshots // loading archived timeline and active timeline success HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload(); - assertEquals(5 * 3 + 4, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants()); - - // do another archive actions with merge small archive files. - for (int i = 1; i < 10; i++) { - testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); - archiveAndGetCommitsList(writeConfig); - } - - // check result. - HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false); - HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload(); - - assertEquals(14 + 4 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants()); - } - - @Test - public void testLoadArchiveTimelineWithDamagedPlanFile() throws Exception { - HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3); - - // do ingestion and trigger archive actions here. - int numInstant = 12; - for (int i = 1; i < 12; i++) { - testTable.doWriteOperation( - "000000" + String.format("%02d", i), - WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), - Arrays.asList("p1", "p2"), - 2); - archiveAndGetCommitsList(writeConfig); - } - - Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME); - String s = "Dummy Content"; - // stain the current merge plan file. - FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes())); - - // check that damaged plan file will not block archived timeline loading. - HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); - HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload(); - assertEquals(5 * 3 + 6, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants()); - - // if there are corrupt archive files and corrupt plan, hoodie needs throw ioe while loading the archived timeline. - Path corruptFile = new Path(metaClient.getArchivePath(), "300_301_1.parquet"); - FileIOUtils.createFileInPath(metaClient.getFs(), corruptFile, Option.of(s.getBytes())); + assertEquals(4 * 3 + 14, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants()); - assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); + assertEquals(9, HoodieArchivedTimeline.latestSnapshotVersion(metaClient)); + assertEquals(Arrays.asList(6, 7, 8, 9), HoodieArchivedTimeline.allSnapshotVersions(metaClient).stream().sorted().collect(Collectors.toList())); } @Test @@ -753,46 +662,6 @@ public static CompletableFuture allOfTerminateOnFailure(List candidateFiles = Arrays.stream(fsStatuses).collect(Collectors.toList()); - List candidateFilePaths = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); - String compactedFileName = HoodieTimelineArchiver.compactedFileName(candidateFiles); - - archiver.buildArchiveMergePlan(candidateFilePaths, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), compactedFileName); - archiver.compactArchiveFiles(candidateFiles, compactedFileName); - - String s = "Dummy Content"; - // stain the current merged archive file. - Path compactedFilePath = new Path(metaClient.getArchivePath(), compactedFileName); - metaClient.getFs().delete(compactedFilePath); - FileIOUtils.createFileInPath(metaClient.getFs(), compactedFilePath, Option.of(s.getBytes())); - - // if there's only one corrupt compaction archive file, ignores the exception while reading this corrupt file. - HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false); - HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline(); - - assertEquals(4 + 5 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants()); - - // if there are a corrupt compaction archive files and other regular corrupt archive file, - // hoodie needs to throw ioe while loading archived timeline because of parsing the corrupt archive file. - Path damagedFile = new Path(metaClient.getArchivePath(), "300_301_1.parquet"); - FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes())); - - assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); - } - @ParameterizedTest @ValueSource(booleans = {true, false}) public void testNoArchivalUntilMaxArchiveConfigWithExtraInflightCommits(boolean enableMetadata) throws Exception { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 79a7bff342e26..8a99b6d65f01b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -21,7 +21,6 @@ import org.apache.hudi.avro.model.HoodieArchivedMetaEntryV2; import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ArchivedMetaEntryReadSchemas; @@ -60,6 +59,7 @@ import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the @@ -74,8 +74,9 @@ */ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { public static final String MERGE_ARCHIVE_PLAN_NAME = "mergeArchivePlan"; + private static final String VERSION_FILE_PREFIX = "_version_"; // _version_[N] + private static final String MANIFEST_FILE_PREFIX = "manifest_"; // manifest_[N] public static final int FILE_LAYER_ZERO = 0; - public static final int FILE_LAYER_ONE = 1; private static final Pattern ARCHIVE_FILE_PATTERN = Pattern.compile("^(\\d+)_(\\d+)_(\\d)\\.parquet"); private static final String INSTANT_TIME_ARCHIVED_META_FIELD = "instantTime"; @@ -227,20 +228,16 @@ private List loadInstants( Function commitsFilter) { try { // List all files - FileStatus[] fsStatuses = metaClient.getFs().globStatus( - new Path(metaClient.getArchivePath() + "/*.parquet")); - - // Sort files by min instant time (implies ascending chronological order) - Arrays.sort(fsStatuses, new ArchiveParquetVersionComparator()); + List fileNames = latestSnapshotFiles(metaClient); Map instantsInRange = new ConcurrentHashMap<>(); Schema readSchema = getReadSchema(loadMode); - Arrays.stream(fsStatuses) - .filter(fileStatus -> filter == null || isFileInRange(filter, fileStatus.getPath())) - .parallel().forEach(fileStatus -> { + fileNames.stream() + .filter(fileName -> filter == null || isFileInRange(filter, fileName)) + .parallel().forEach(fileName -> { // Read the archived file try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) - .getFileReader(metaClient.getHadoopConf(), fileStatus.getPath())) { + .getFileReader(metaClient.getHadoopConf(), new Path(metaClient.getArchivePath(), fileName))) { try (ClosableIterator iterator = reader.getIndexedRecordIterator(HoodieArchivedMetaEntryV2.getClassSchema(), readSchema)) { while (iterator.hasNext()) { GenericRecord record = (GenericRecord) iterator.next(); @@ -253,18 +250,8 @@ private List loadInstants( } } } - } catch (Exception oriException) { - // merge small archive files may leave uncompleted archive file which will cause exception. - // need to ignore this kind of exception here. - try { - if (!isCorruptCompactionFile(metaClient, fileStatus)) { - throw oriException; - } - } catch (Exception e) { - // If anything wrong during parsing merge archive plan, we need to throw the original exception. - // For example corrupted archive file and corrupted plan are both existed. - throw new HoodieException(oriException); - } + } catch (IOException ioException) { + throw new HoodieIOException("Error open file reader for path: " + new Path(metaClient.getArchivePath(), fileName)); } }); @@ -304,8 +291,7 @@ private static Schema getReadSchema(LoadMode loadMode) { } } - private static boolean isFileInRange(TimeRangeFilter filter, Path path) { - String fileName = path.getName(); + private static boolean isFileInRange(TimeRangeFilter filter, String fileName) { String minInstant = getMinInstantTime(fileName); String maxInstant = getMaxInstantTime(fileName); return filter.isInRange(minInstant) || filter.isInRange(maxInstant); @@ -341,10 +327,10 @@ public boolean isInRange(String instantTime) { /** * Sort files by order of min instant time in file name. */ - public static class ArchiveParquetVersionComparator implements Comparator, Serializable { + public static class ArchiveParquetVersionComparator implements Comparator, Serializable { @Override - public int compare(FileStatus f1, FileStatus f2) { - return f1.getPath().getName().compareTo(f2.getPath().getName()); + public int compare(String f1, String f2) { + return f1.compareTo(f2); } } @@ -372,6 +358,64 @@ private static int getArchivedFileSuffix(FileStatus f) { return 0; } + public static int latestSnapshotVersion(HoodieTableMetaClient metaClient) throws IOException { + return allSnapshotVersions(metaClient).stream().max(Integer::compareTo).orElse(-1); + } + + public static List allSnapshotVersions(HoodieTableMetaClient metaClient) throws IOException { + return Arrays.stream(metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getVersionFilePathFilter())) + .map(fileStatus -> fileStatus.getPath().getName()) + .map(HoodieArchivedTimeline::getSnapshotVersion) + .collect(Collectors.toList()); + } + + public static List latestSnapshotFiles(HoodieTableMetaClient metaClient) throws IOException { + int latestVersion = latestSnapshotVersion(metaClient); + return latestSnapshotFiles(metaClient, latestVersion); + } + + /** + * Reads the file list from the manifest file for the latest snapshot. + */ + public static List latestSnapshotFiles(HoodieTableMetaClient metaClient, int latestVersion) { + if (latestVersion < 0) { + // there is no valid snapshot of the timeline. + return Collections.emptyList(); + } + // read and deserialize the valid files. + byte[] content = FileIOUtils.readDataFromPath(metaClient.getFs(), getManifestFilePath(metaClient, latestVersion)).get(); + return Arrays.stream(new String(content).split(",")).collect(Collectors.toList()); + } + + public static Path getManifestFilePath(HoodieTableMetaClient metaClient, int snapshotVersion) { + return new Path(metaClient.getArchivePath(), MANIFEST_FILE_PREFIX + snapshotVersion); + } + + public static Path getVersionFilePath(HoodieTableMetaClient metaClient, int snapshotVersion) { + return new Path(metaClient.getArchivePath(), VERSION_FILE_PREFIX + snapshotVersion); + } + + public static FileStatus[] listAllVersionFiles(HoodieTableMetaClient metaClient) throws IOException { + return metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getVersionFilePathFilter()); + } + + public static FileStatus[] listAllManifestFiles(HoodieTableMetaClient metaClient) throws IOException { + return metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getManifestFilePathFilter()); + } + + public static FileStatus[] listAllDataFiles(HoodieTableMetaClient metaClient) throws IOException { + return metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/*.parquet")); + } + + public static int getSnapshotVersion(String fileName) { + return Integer.parseInt(fileName.split("_")[2]); + } + + public static int getManifestVersion(String fileName) { + return Integer.parseInt(fileName.split("_")[1]); + } + public static int getFileLayer(String fileName) { try { Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName); @@ -405,14 +449,16 @@ public static String getMaxInstantTime(String fileName) { } } - public static PathFilter getLayerZeroPathFilter() { - return path -> path.getName().endsWith(HoodieFileFormat.PARQUET.getFileExtension()) - && getFileLayer(path.getName()) == FILE_LAYER_ZERO; + public static boolean isFileFromLayer(String fileName, int layer) { + return getFileLayer(fileName) == layer; + } + + public static PathFilter getVersionFilePathFilter() { + return path -> path.getName().startsWith(VERSION_FILE_PREFIX); } - public static PathFilter getLayerOnePathFilter() { - return path -> path.getName().endsWith(HoodieFileFormat.PARQUET.getFileExtension()) - && getFileLayer(path.getName()) == FILE_LAYER_ONE; + public static PathFilter getManifestFilePathFilter() { + return path -> path.getName().startsWith(MANIFEST_FILE_PREFIX); } @Override diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala index 935bdadcb58a6..ac1dda83b464a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala @@ -20,11 +20,12 @@ package org.apache.spark.sql.execution.benchmark import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.DummyInstantTriple import org.apache.hudi.client.HoodieTimelineArchiver import org.apache.hudi.client.common.HoodieJavaEngineContext import org.apache.hudi.client.utils.InstantTriple import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieTableType, WriteOperationType} -import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieArchivedTimeline} +import org.apache.hudi.common.table.timeline.{HoodieArchivedTimeline, HoodieInstant} import org.apache.hudi.common.testutils.{HoodieTestTable, HoodieTestUtils} import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.index.HoodieIndex.IndexType @@ -33,7 +34,6 @@ import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase} import java.util import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` -import scala.jdk.CollectionConverters.seqAsJavaListConverter object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase { @@ -42,8 +42,8 @@ object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase { * Apple M2 * pref load archived instants: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative * ------------------------------------------------------------------------------------------------------------------------ - * read shim instants 19 34 14 0.1 18953.5 1.0X - * read instants with commit metadata 18 24 9 0.1 17694.4 1.1X + * read shim instants 18 32 15 0.1 17914.8 1.0X + * read instants with commit metadata 19 25 5 0.1 19403.1 0.9X */ private def readArchivedInstantsBenchmark(): Unit = { withTempDir(f => { @@ -51,41 +51,44 @@ object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase { val tablePath = new Path(f.getCanonicalPath, tableName).toUri.toString val metaClient = HoodieTestUtils.init(new Configuration(), tablePath, HoodieTableType.COPY_ON_WRITE, tableName) val testTable = HoodieTestTable.of(metaClient) - // generate 1000 instants on timeline - for (i <- 1 to 1000) { - val instantTime = 1000 + i + "" - testTable.doWriteOperation(instantTime, WriteOperationType.INSERT, util.Arrays.asList("par1, par2"), 10) - } + val writeConfig = HoodieWriteConfig.newBuilder().withPath(tablePath) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build()) .withMarkersType("DIRECT") .build() val engineContext = new HoodieJavaEngineContext(new Configuration()) val archiver = new HoodieTimelineArchiver(writeConfig, HoodieJavaTable.create(writeConfig, engineContext).asInstanceOf[HoodieJavaTable[HoodieAvroPayload]]) - // fetch all the instant on the active timeline - val rawActiveTimeline: HoodieActiveTimeline = new HoodieActiveTimeline(metaClient, false) - val instantTriples = rawActiveTimeline.getInstants.groupBy(instant => instant.getTimestamp) - .values - .map(instants => InstantTriple.fromInstants(instants.toSeq.asJava)) - .toSeq.sorted.asJava + + val startTs = 10000000 + val startInstant = startTs + 1 + "" + val metadata = testTable.doWriteOperation(startInstant, WriteOperationType.INSERT, util.Arrays.asList("par1, par2"), 10).toJsonString.getBytes val instantBuffer: util.ArrayList[InstantTriple] = new util.ArrayList[InstantTriple]() - for (i <- 1 to 1000) { - instantBuffer.add(instantTriples.get(i - 1)) - if (i % 10 == 0) { + val commitsNum = 20000 + val batchSize = 10 + for (i <- 1 to commitsNum) { + val instantTime = startTs + i + "" + val instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, instantTime) + instantBuffer.add(new DummyInstantTriple(instant, metadata)) + if (i % batchSize == 0) { // archive 10 instants each time archiver.archive(engineContext, instantBuffer) archiver.compactAndClean(engineContext) instantBuffer.clear() } } - val benchmark = new HoodieBenchmark("pref load archived instants", 1000, 3) + + val benchmark = new HoodieBenchmark("pref load archived instants", commitsNum, 3) benchmark.addCase("read shim instants") { _ => new HoodieArchivedTimeline(metaClient) } benchmark.addCase("read instants with commit metadata") { _ => - new HoodieArchivedTimeline(metaClient, instantTriples.get(0).getInstantTime) + new HoodieArchivedTimeline(metaClient, startInstant) } benchmark.run() + val totalSize = HoodieArchivedTimeline.latestSnapshotFiles(metaClient) + .map(name => metaClient.getFs.getFileStatus(new Path(metaClient.getArchivePath, name)).getLen) + .sum + println("Total file size in bytes: " + totalSize) }) }