From 6926ee79a3bffa3112ed060778385b808a7ca10c Mon Sep 17 00:00:00 2001
From: danny0405
Date: Sun, 16 Jul 2023 10:33:19 +0800
Subject: [PATCH] [HUDI-6539] New LSM tree style archived timeline
---
.../hudi/cli/commands/CompactionCommand.java | 19 +-
.../hudi/client/HoodieTimelineArchiver.java | 454 +++++++-------
.../client/LegacyArchivedMetaEntryReader.java | 258 ++++++++
.../hudi/client/utils/InstantTriple.java | 141 +++++
.../client/utils/MetadataConversionUtils.java | 41 +-
.../hudi/config/HoodieArchivalConfig.java | 23 -
.../apache/hudi/config/HoodieWriteConfig.java | 8 -
.../org/apache/hudi/DummyInstantTriple.java | 46 ++
.../hudi/io/TestHoodieTimelineArchiver.java | 248 ++------
.../src/main/avro/HoodieArchivedInstant.avsc | 49 ++
.../common/table/HoodieTableMetaClient.java | 2 +-
.../table/timeline/HoodieActiveTimeline.java | 2 +-
.../timeline/HoodieArchivedTimeline.java | 579 ++++++++++++------
.../common/table/timeline/HoodieInstant.java | 4 +-
.../util/ArchivedMetaEntryReadSchemas.java | 105 ++++
.../io/storage/HoodieAvroFileReaderBase.java | 7 +-
.../io/storage/HoodieAvroHFileReader.java | 2 +-
.../hudi/io/storage/HoodieAvroOrcReader.java | 2 +-
.../io/storage/HoodieAvroParquetReader.java | 2 +-
.../hudi/io/storage/HoodieFileWriter.java | 2 +-
.../common/testutils/HoodieTestTable.java | 9 +
.../ArchivedTimelineReadBenchmark.scala | 98 +++
22 files changed, 1397 insertions(+), 704 deletions(-)
create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/LegacyArchivedMetaEntryReader.java
create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/InstantTriple.java
create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyInstantTriple.java
create mode 100644 hudi-common/src/main/avro/HoodieArchivedInstant.avsc
create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedMetaEntryReadSchemas.java
create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index c9cebb1b227f6..0a21dd71d2b46 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -174,8 +174,8 @@ public String compactionShowArchived(
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime);
try {
archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime);
- HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeAvroRecordMetadata(
- archivedTimeline.getInstantDetails(instant).get(), HoodieCompactionPlan.getClassSchema());
+ HoodieCompactionPlan compactionPlan =
+ TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition);
} finally {
archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime);
@@ -365,17 +365,10 @@ Function compactionPlanReader(
private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline,
HoodieInstant instant) {
- // filter inflight compaction
- if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())
- && HoodieInstant.State.INFLIGHT.equals(instant.getState())) {
- try {
- return TimelineMetadataUtils.deserializeAvroRecordMetadata(archivedTimeline.getInstantDetails(instant).get(),
- HoodieCompactionPlan.getClassSchema());
- } catch (Exception e) {
- throw new HoodieException(e.getMessage(), e);
- }
- } else {
- return null;
+ try {
+ return TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
+ } catch (Exception e) {
+ throw new HoodieException(e.getMessage(), e);
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 3d41e3011bdd2..751e082c1b871 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -19,33 +19,23 @@
package org.apache.hudi.client;
-import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
-import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
+import org.apache.hudi.avro.model.HoodieArchivedInstant;
import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.InstantTriple;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.fs.StorageSchemes;
-import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
@@ -53,11 +43,17 @@
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
@@ -73,12 +69,14 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -105,9 +103,8 @@ public class HoodieTimelineArchiver {
private static final Logger LOG = LoggerFactory.getLogger(HoodieTimelineArchiver.class);
- private final Path archiveFilePath;
private final HoodieWriteConfig config;
- private Writer writer;
+ private HoodieWriteConfig writerConfig;
private final int maxInstantsToKeep;
private final int minInstantsToKeep;
private final HoodieTable table;
@@ -118,7 +115,6 @@ public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable
this.config = config;
this.table = table;
this.metaClient = table.getMetaClient();
- this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
HoodieTimeline completedCommitsTimeline = table.getCompletedCommitsTimeline();
Option latestCommit = completedCommitsTimeline.lastInstant();
@@ -188,40 +184,24 @@ private Option getEarliestCommitToRetain(Option la
return earliestCommitToRetain;
}
- private Writer openWriter() {
- try {
- if (this.writer == null) {
- return HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent())
- .withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
- .withFs(metaClient.getFs()).overBaseCommit("").build();
- } else {
- return this.writer;
- }
- } catch (IOException e) {
- throw new HoodieException("Unable to initialize HoodieLogFormat writer", e);
- }
- }
-
- public Writer reOpenWriter() {
- try {
- if (this.writer != null) {
- this.writer.close();
- this.writer = null;
- }
- this.writer = openWriter();
- return writer;
- } catch (IOException e) {
- throw new HoodieException("Unable to initialize HoodieLogFormat writer", e);
+ /**
+ * Get or create a writer config for parquet writer.
+ */
+ private HoodieWriteConfig getOrCreateWriterConfig() {
+ if (this.writerConfig == null) {
+ this.writerConfig = HoodieWriteConfig.newBuilder()
+ .withProperties(this.config.getProps())
+ .withPopulateMetaFields(false).build();
}
+ return this.writerConfig;
}
- private void close() {
+ private HoodieFileWriter openWriter(Path filePath) {
try {
- if (this.writer != null) {
- this.writer.close();
- }
+ return HoodieFileWriterFactory.getFileWriter("", filePath, metaClient.getHadoopConf(), getOrCreateWriterConfig(),
+ HoodieArchivedInstant.getClassSchema(), table.getTaskContextSupplier(), HoodieRecordType.AVRO);
} catch (IOException e) {
- throw new HoodieException("Unable to close HoodieLogFormat writer", e);
+ throw new HoodieException("Unable to initialize archiving writer", e);
}
}
@@ -238,11 +218,10 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc
// there is no owner or instant time per se for archival.
txnManager.beginTransaction(Option.empty(), Option.empty());
}
- List instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
- verifyLastMergeArchiveFilesIfNecessary(context);
+ // Sort again because the cleaning and rollback instants could break the sequence.
+ List instantsToArchive = getInstantsToArchive().sorted().collect(Collectors.toList());
boolean success = true;
if (!instantsToArchive.isEmpty()) {
- this.writer = openWriter();
LOG.info("Archiving instants " + instantsToArchive);
archive(context, instantsToArchive);
LOG.info("Deleting archived instants " + instantsToArchive);
@@ -251,194 +230,208 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc
LOG.info("No Instants to archive");
}
- if (shouldMergeSmallArchiveFiles()) {
- mergeArchiveFilesIfNecessary(context);
- }
+ compactAndClean(context);
return success;
} finally {
- close();
if (acquireLock) {
txnManager.endTransaction(Option.empty());
}
}
}
- public boolean shouldMergeSmallArchiveFiles() {
- return config.getArchiveMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme());
- }
-
/**
- * Here Hoodie can merge the small archive files into a new larger one.
- * Only used for filesystem which does not support append operation.
- * The whole merge small archive files operation has four stages:
- * 1. Build merge plan with merge candidates/merged file name infos.
- * 2. Do merge.
- * 3. Delete all the candidates.
- * 4. Delete the merge plan.
+ * Merges the small archive files.
+ *
+ * The parquet naming convention is:
+ *
+ *
${min_instant}_${max_instant}_${level}.parquet
+ *
+ * The 'min_instant' and 'max_instant' represent the instant time range of the parquet file.
+ * The 'level' represents the number of the level where the file is located, currently we
+ * have no limit for the number of layers.
+ *
+ *
These archive parquet files composite as an LSM tree layout, one parquet file contains
+ * a consecutive timestamp instant metadata entries. Different parquet files may have
+ * overlapping with the instant time ranges.
+ *
+ *
+ * t1_t2_0.parquet, t3_t4_0.parquet, ... t5_t6_0.parquet L0 layer
+ * \ /
+ * \ /
+ * |
+ * V
+ * t3_t6_1.parquet L1 layer
+ *
+ *
+ * Compaction and cleaning: once the files number exceed a threshold(now constant 10) N,
+ * the oldest N files are then replaced with a compacted file in the next layer.
+ * A cleaning action is triggered right after the compaction.
*
* @param context HoodieEngineContext
- * @throws IOException
*/
- private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException {
- Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
- // Flush remained content if existed and open a new write
- reOpenWriter();
- // List all archive files
- FileStatus[] fsStatuses = metaClient.getFs().globStatus(
- new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
- // Sort files by version suffix in reverse (implies reverse chronological order)
- Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator());
+ @VisibleForTesting
+ public void compactAndClean(HoodieEngineContext context) throws IOException {
+ // List all archive files in L0
+ List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient);
+ int layer = 0;
+ Option compactedFileName = doCompact(latestSnapshotFiles, layer);
+ while (compactedFileName.isPresent()) {
+ latestSnapshotFiles.add(compactedFileName.get());
+ compactedFileName = doCompact(latestSnapshotFiles, ++layer);
+ }
+
+ // cleaning
+ clean(context, layer + 1);
+ }
+
+ private Option doCompact(List latestSnapshotFiles, int layer) throws IOException {
+ List files = latestSnapshotFiles
+ .stream().filter(file -> HoodieArchivedTimeline.isFileFromLayer(file, layer)).collect(Collectors.toList());
int archiveMergeFilesBatchSize = config.getArchiveMergeFilesBatchSize();
- long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
-
- List mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses);
-
- if (mergeCandidate.size() >= archiveMergeFilesBatchSize) {
- List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList());
- // before merge archive files build merge plan
- String logFileName = computeLogFileName();
- buildArchiveMergePlan(candidateFiles, planPath, logFileName);
- // merge archive files
- mergeArchiveFiles(mergeCandidate);
- // after merge, delete the small archive files.
- deleteFilesParallelize(metaClient, candidateFiles, context, true);
- LOG.info("Success to delete replaced small archive files.");
- // finally, delete archiveMergePlan which means merging small archive files operation is successful.
- metaClient.getFs().delete(planPath, false);
- LOG.info("Success to merge small archive files.");
+
+ if (files.size() >= archiveMergeFilesBatchSize) {
+ // Sort files by min instant time (implies ascending chronological order)
+ files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
+ List candidateFiles = files.stream()
+ .limit(archiveMergeFilesBatchSize)
+ .collect(Collectors.toList());
+ String compactedFileName = compactedFileName(candidateFiles);
+
+ // compact archive files
+ compactArchiveFiles(candidateFiles, compactedFileName);
+ updateManifest(candidateFiles, compactedFileName);
+ LOG.info("Finishes compaction of archive files: " + candidateFiles);
+ return Option.of(compactedFileName);
}
+ return Option.empty();
}
/**
- * Find the latest 'huge archive file' index as a break point and only check/merge newer archive files.
- * Because we need to keep the original order of archive files which is important when loading archived instants with time filter.
- * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function commitsFilter)
- *
- * @param smallFileLimitBytes small File Limit Bytes
- * @param fsStatuses Sort by version suffix in reverse
- * @return merge candidates
+ * Returns true if the file was generated one week ago.
*/
- private List getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) {
- int index = 0;
- for (; index < fsStatuses.length; index++) {
- if (fsStatuses[index].getLen() > smallFileLimitBytes) {
- break;
- }
- }
- return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
+ private static boolean isOverdueFile(long currentTimeMillis, FileStatus fileStatus) {
+ return currentTimeMillis - fileStatus.getModificationTime() > 604800000L;
+ }
+
+ /**
+ * Returns a new file name.
+ */
+ private static String newFileName(String minInstant, String maxInstant, int layer) {
+ return minInstant + "_" + maxInstant + "_" + layer + HoodieFileFormat.PARQUET.getFileExtension();
}
/**
- * Get final written archive file name based on storageSchemes which does not support append.
+ * Returns a new file name.
*/
- private String computeLogFileName() throws IOException {
- String logWriteToken = writer.getLogFile().getLogWriteToken();
- HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
- return hoodieLogFile.getFileName();
+ @VisibleForTesting
+ public static String compactedFileName(List files) {
+ String minInstant = files.stream().map(HoodieArchivedTimeline::getMinInstantTime)
+ .min(Comparator.naturalOrder()).get();
+ String maxInstant = files.stream().map(HoodieArchivedTimeline::getMaxInstantTime)
+ .max(Comparator.naturalOrder()).get();
+ int currentLayer = HoodieArchivedTimeline.getFileLayer(files.get(0));
+ return newFileName(minInstant, maxInstant, currentLayer + 1);
}
/**
- * Check/Solve if there is any failed and unfinished merge small archive files operation
+ * Checks whether there is any unfinished compaction operation.
*
* @param context HoodieEngineContext used for parallelize to delete small archive files if necessary.
- * @throws IOException
*/
- private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException {
- if (shouldMergeSmallArchiveFiles()) {
- Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
- HoodieWrapperFileSystem fs = metaClient.getFs();
- // If plan exist, last merge small archive files was failed.
- // we need to revert or complete last action.
- if (fs.exists(planPath)) {
- HoodieMergeArchiveFilePlan plan = null;
- try {
- plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fs, planPath).get(), HoodieMergeArchiveFilePlan.class);
- } catch (IOException e) {
- LOG.warn("Parsing merge archive plan failed.", e);
- // Reading partial plan file which means last merge action is failed during writing plan file.
- fs.delete(planPath);
- return;
+ private void clean(HoodieEngineContext context, int compactedVersions) throws IOException {
+ // if there are more than 3 version of snapshots, clean the oldest files.
+ List allSnapshotVersions = HoodieArchivedTimeline.allSnapshotVersions(metaClient);
+ int numVersionsToKeep = 3 + compactedVersions; // should make the threshold configurable.
+ if (allSnapshotVersions.size() > numVersionsToKeep) {
+ allSnapshotVersions.sort((v1, v2) -> v2 - v1);
+ List versionsToKeep = allSnapshotVersions.subList(0, numVersionsToKeep);
+ Set filesToKeep = versionsToKeep.stream()
+ .flatMap(version -> HoodieArchivedTimeline.latestSnapshotFiles(metaClient, version).stream())
+ .collect(Collectors.toSet());
+ // delete the manifest file first
+ List metaFilesToClean = new ArrayList<>();
+ Arrays.stream(HoodieArchivedTimeline.listAllVersionFiles(metaClient)).forEach(fileStatus -> {
+ if (!versionsToKeep.contains(HoodieArchivedTimeline.getSnapshotVersion(fileStatus.getPath().getName()))) {
+ metaFilesToClean.add(fileStatus.getPath().toString());
}
- Path mergedArchiveFile = new Path(metaClient.getArchivePath(), plan.getMergedArchiveFileName());
- List candidates = plan.getCandidate().stream().map(Path::new).collect(Collectors.toList());
- if (candidateAllExists(candidates)) {
- // Last merge action is failed during writing merged archive file.
- // But all the small archive files are not deleted.
- // Revert last action by deleting mergedArchiveFile if existed.
- if (fs.exists(mergedArchiveFile)) {
- fs.delete(mergedArchiveFile, false);
- }
- } else {
- // Last merge action is failed during deleting small archive files.
- // But the merged files is completed.
- // Try to complete last action
- if (fs.exists(mergedArchiveFile)) {
- deleteFilesParallelize(metaClient, plan.getCandidate(), context, true);
- }
+ });
+ Arrays.stream(HoodieArchivedTimeline.listAllManifestFiles(metaClient)).forEach(fileStatus -> {
+ if (!versionsToKeep.contains(HoodieArchivedTimeline.getManifestVersion(fileStatus.getPath().getName()))) {
+ metaFilesToClean.add(fileStatus.getPath().toString());
}
-
- fs.delete(planPath);
- }
+ });
+ deleteFilesParallelize(metaClient, metaFilesToClean, context, false);
+ // delete the archive data files
+ List dataFilesToClean = Arrays.stream(HoodieArchivedTimeline.listAllMetaFiles(metaClient))
+ .filter(fileStatus -> !filesToKeep.contains(fileStatus.getPath().getName()))
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+ deleteFilesParallelize(metaClient, dataFilesToClean, context, false);
}
}
- /**
- * If all the candidate small archive files existed, last merge operation was failed during writing the merged archive file.
- * If at least one of candidate small archive files existed, the merged archive file was created and last operation was failed during deleting the small archive files.
- */
- private boolean candidateAllExists(List candidates) throws IOException {
- for (Path archiveFile : candidates) {
- if (!metaClient.getFs().exists(archiveFile)) {
- // candidate is deleted
- return false;
- }
- }
- return true;
+ public void updateManifest(String fileToAdd) throws IOException {
+ // 1. read the latest manifest version file;
+ // 2. read the latest manifest file for valid files;
+ // 3. add this new file to the existing file list from step2.
+ int latestVersion = HoodieArchivedTimeline.latestSnapshotVersion(metaClient);
+ List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient, latestVersion);
+ List newFileList = new ArrayList<>(latestSnapshotFiles);
+ newFileList.add(fileToAdd);
+ createManifestFile(newFileList, latestVersion);
+ }
+
+ public void updateManifest(List filesToRemove, String fileToAdd) throws IOException {
+ // 1. read the latest manifest version file;
+ // 2. read the latest manifest file for valid files;
+ // 3. remove files to the existing file list from step2;
+ // 4. add this new file to the existing file list from step2.
+ int latestVersion = HoodieArchivedTimeline.latestSnapshotVersion(metaClient);
+ List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient, latestVersion);
+ List newFileList = new ArrayList<>(latestSnapshotFiles);
+ newFileList.removeAll(filesToRemove);
+ newFileList.add(fileToAdd);
+ createManifestFile(newFileList, latestVersion);
}
- public void buildArchiveMergePlan(List compactCandidate, Path planPath, String compactedArchiveFileName) throws IOException {
- LOG.info("Start to build archive merge plan.");
- HoodieMergeArchiveFilePlan plan = HoodieMergeArchiveFilePlan.newBuilder()
- .setCandidate(compactCandidate)
- .setMergedArchiveFileName(compactedArchiveFileName)
- .build();
- Option content = TimelineMetadataUtils.serializeAvroMetadata(plan, HoodieMergeArchiveFilePlan.class);
- // building merge archive files plan.
- FileIOUtils.createFileInPath(metaClient.getFs(), planPath, content);
- LOG.info("Success to build archive merge plan");
+ private void createManifestFile(List newFileList, int currentVersion) {
+ byte[] content = String.join(",", newFileList).getBytes(StandardCharsets.UTF_8);
+ // version starts from 1 and increases monotonically
+ int newVersion = currentVersion < 0 ? 1 : currentVersion + 1;
+ // create manifest file
+ FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getManifestFilePath(metaClient, newVersion), Option.of(content));
+ // create version file
+ FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getVersionFilePath(metaClient, newVersion), Option.empty());
}
- public void mergeArchiveFiles(List compactCandidate) throws IOException {
+ public void compactArchiveFiles(List candidateFiles, String compactedFileName) {
LOG.info("Starting to merge small archive files.");
- Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
- try {
- List records = new ArrayList<>();
- for (FileStatus fs : compactCandidate) {
+ try (HoodieFileWriter writer = openWriter(new Path(metaClient.getArchivePath(), compactedFileName))) {
+ for (String fileName : candidateFiles) {
// Read the archived file
- try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
- new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
- // Read the avro blocks
- while (reader.hasNext()) {
- HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> records.add((IndexedRecord) r.getData()));
- if (records.size() >= this.config.getCommitArchivalBatchSize()) {
- writeToFile(wrapperSchema, records);
+ try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ .getFileReader(metaClient.getHadoopConf(), new Path(metaClient.getArchivePath(), fileName))) {
+ // Read the meta entry
+ try (ClosableIterator iterator = reader.getIndexedRecordIterator(HoodieArchivedInstant.getClassSchema(), HoodieArchivedInstant.getClassSchema())) {
+ while (iterator.hasNext()) {
+ IndexedRecord record = iterator.next();
+ writer.write(record.get(0).toString(), new HoodieAvroIndexedRecord(record), HoodieArchivedInstant.getClassSchema());
}
}
}
}
- writeToFile(wrapperSchema, records);
} catch (Exception e) {
throw new HoodieCommitException("Failed to merge small archive files", e);
- } finally {
- writer.close();
}
LOG.info("Success to merge small archive files.");
}
- private Map deleteFilesParallelize(HoodieTableMetaClient metaClient, List paths, HoodieEngineContext context, boolean ignoreFailed) {
-
+ private Map deleteFilesParallelize(
+ HoodieTableMetaClient metaClient,
+ List paths,
+ HoodieEngineContext context,
+ boolean ignoreFailed) {
return FSUtils.parallelizeFilesProcess(context,
metaClient.getFs(),
config.getArchiveDeleteParallelism(),
@@ -471,7 +464,7 @@ private Stream getCleanInstantsToArchive() {
if (hoodieInstants.size() > this.maxInstantsToKeep) {
return hoodieInstants.subList(0, hoodieInstants.size() - this.minInstantsToKeep);
} else {
- return new ArrayList();
+ return Collections.emptyList();
}
}).flatMap(Collection::stream);
}
@@ -498,9 +491,7 @@ private Stream getCommitInstantsToArchive() throws IOException {
LESSER_THAN, oldestPendingInstant.get().getTimestamp())).findFirst());
// Check if the completed instant is higher than the oldest inflight instant
// in that case update the oldestCommitToRetain to oldestInflight commit time.
- if (!completedCommitBeforeOldestPendingInstant.isPresent()
- || HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
- LESSER_THAN, completedCommitBeforeOldestPendingInstant.get().getTimestamp())) {
+ if (!completedCommitBeforeOldestPendingInstant.isPresent()) {
oldestCommitToRetain = oldestPendingInstant;
} else {
oldestCommitToRetain = completedCommitBeforeOldestPendingInstant;
@@ -567,8 +558,7 @@ private Stream getCommitInstantsToArchive() throws IOException {
}
}
- private Stream getInstantsToArchive() throws IOException {
- Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+ private Stream getInstantsToArchive() throws IOException {
if (config.isMetaserverEnabled()) {
return Stream.empty();
}
@@ -576,8 +566,10 @@ private Stream getInstantsToArchive() throws IOException {
// For archiving and cleaning instants, we need to include intermediate state files if they exist
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
Map, List> groupByTsAction = rawActiveTimeline.getInstantsAsStream()
- .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
- HoodieInstant.getComparableAction(i.getAction()))));
+ .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
+ HoodieInstant.getComparableAction(i.getAction()))));
+
+ Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
// If metadata table is enabled, do not archive instants which are more recent than the last compaction on the
// metadata table.
@@ -626,29 +618,26 @@ private Stream getInstantsToArchive() throws IOException {
}
}
- return instants.flatMap(hoodieInstant -> {
+ return instants.map(hoodieInstant -> {
List instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction())));
- if (instantsToStream != null) {
- return instantsToStream.stream();
- } else {
- // if a concurrent writer archived the instant
- return Stream.empty();
- }
+ return InstantTriple.fromInstants(instantsToStream);
});
}
- private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) throws IOException {
+ private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) {
LOG.info("Deleting instants " + archivedInstants);
List pendingInstants = new ArrayList<>();
List completedInstants = new ArrayList<>();
- for (HoodieInstant instant : archivedInstants) {
- if (instant.isCompleted()) {
- completedInstants.add(instant);
- } else {
- pendingInstants.add(instant);
+ for (InstantTriple triple : archivedInstants) {
+ completedInstants.add(triple.getCompleted());
+ if (triple.getRequested() != null) {
+ pendingInstants.add(triple.getRequested());
+ }
+ if (triple.getInflight() != null) {
+ pendingInstants.add(triple.getInflight());
}
}
@@ -678,52 +667,35 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo
return true;
}
- public void archive(HoodieEngineContext context, List instants) throws HoodieCommitException {
- try {
- Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
- LOG.info("Wrapper schema " + wrapperSchema.toString());
- List records = new ArrayList<>();
- for (HoodieInstant hoodieInstant : instants) {
+ public void archive(HoodieEngineContext context, List instants) throws HoodieCommitException {
+ Path filePath = new Path(metaClient.getArchivePath(),
+ newFileName(instants.get(0).getInstantTime(), instants.get(instants.size() - 1).getInstantTime(), HoodieArchivedTimeline.FILE_LAYER_ZERO));
+ try (HoodieFileWriter writer = openWriter(filePath)) {
+ Schema wrapperSchema = HoodieArchivedInstant.getClassSchema();
+ LOG.info("Archiving schema " + wrapperSchema.toString());
+ for (InstantTriple triple : instants) {
try {
- deleteAnyLeftOverMarkers(context, hoodieInstant);
- records.add(convertToAvroRecord(hoodieInstant));
- if (records.size() >= this.config.getCommitArchivalBatchSize()) {
- writeToFile(wrapperSchema, records);
- }
+ deleteAnyLeftOverMarkers(context, triple);
+ // in local FS and HDFS, there could be empty completed instants due to crash.
+ final HoodieArchivedInstant metaEntry = MetadataConversionUtils.createArchivedInstant(triple, metaClient);
+ writer.write(metaEntry.getInstantTime(), new HoodieAvroIndexedRecord(metaEntry), wrapperSchema);
} catch (Exception e) {
- LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e);
+ LOG.error("Failed to archive instant: " + triple.getInstantTime(), e);
if (this.config.isFailOnTimelineArchivingEnabled()) {
throw e;
}
}
}
- writeToFile(wrapperSchema, records);
+ updateManifest(filePath.getName());
} catch (Exception e) {
throw new HoodieCommitException("Failed to archive commits", e);
}
}
- private void deleteAnyLeftOverMarkers(HoodieEngineContext context, HoodieInstant instant) {
- WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, instant.getTimestamp());
+ private void deleteAnyLeftOverMarkers(HoodieEngineContext context, InstantTriple instantTriple) {
+ WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, instantTriple.getInstantTime());
if (writeMarkers.deleteMarkerDir(context, config.getMarkersDeleteParallelism())) {
- LOG.info("Cleaned up left over marker directory for instant :" + instant);
- }
- }
-
- private void writeToFile(Schema wrapperSchema, List records) throws Exception {
- if (records.size() > 0) {
- Map header = new HashMap<>();
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
- final String keyField = table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
- List indexRecords = records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
- HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, header, keyField);
- writer.appendBlock(block);
- records.clear();
+ LOG.info("Cleaned up left over marker directory for instant :" + instantTriple.getCompleted());
}
}
-
- private IndexedRecord convertToAvroRecord(HoodieInstant hoodieInstant)
- throws IOException {
- return MetadataConversionUtils.createMetaWrapper(hoodieInstant, metaClient);
- }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/LegacyArchivedMetaEntryReader.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/LegacyArchivedMetaEntryReader.java
new file mode 100644
index 0000000000000..eaad3d120b04c
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/LegacyArchivedMetaEntryReader.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.StreamSupport;
+
+/**
+ * Tools used for migrating to new LSM tree style archived timeline.
+ */
+public class LegacyArchivedMetaEntryReader {
+ private static final Logger LOG = LoggerFactory.getLogger(LegacyArchivedMetaEntryReader.class);
+
+ private static final Pattern ARCHIVE_FILE_PATTERN =
+ Pattern.compile("^\\.commits_\\.archive\\.([0-9]+).*");
+
+ public static final String MERGE_ARCHIVE_PLAN_NAME = "mergeArchivePlan";
+
+ private static final String ACTION_TYPE_KEY = "actionType";
+ private static final String ACTION_STATE = "actionState";
+ private static final String STATE_TRANSITION_TIME = "stateTransitionTime";
+
+ private final HoodieTableMetaClient metaClient;
+
+ private final Map readCommits;
+
+ public LegacyArchivedMetaEntryReader(HoodieTableMetaClient metaClient) {
+ this.metaClient = metaClient;
+ this.readCommits = new HashMap<>();
+ }
+
+ /**
+ * @deprecated Use {@code HoodieArchivedTimeline#readCommit} instead.
+ */
+ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
+ final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
+ final String action = record.get(ACTION_TYPE_KEY).toString();
+ final String stateTransitionTime = (String) record.get(STATE_TRANSITION_TIME);
+ if (loadDetails) {
+ getMetadataKey(action).map(key -> {
+ Object actionData = record.get(key);
+ if (actionData != null) {
+ if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
+ this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord) actionData));
+ } else {
+ this.readCommits.put(instantTime, actionData.toString().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ return null;
+ });
+ }
+ return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action,
+ instantTime, stateTransitionTime);
+ }
+
+ @Nonnull
+ private Option getMetadataKey(String action) {
+ switch (action) {
+ case HoodieTimeline.CLEAN_ACTION:
+ return Option.of("hoodieCleanMetadata");
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ return Option.of("hoodieCommitMetadata");
+ case HoodieTimeline.ROLLBACK_ACTION:
+ return Option.of("hoodieRollbackMetadata");
+ case HoodieTimeline.SAVEPOINT_ACTION:
+ return Option.of("hoodieSavePointMetadata");
+ case HoodieTimeline.COMPACTION_ACTION:
+ case HoodieTimeline.LOG_COMPACTION_ACTION:
+ return Option.of("hoodieCompactionPlan");
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ return Option.of("hoodieReplaceCommitMetadata");
+ case HoodieTimeline.INDEXING_ACTION:
+ return Option.of("hoodieIndexCommitMetadata");
+ default:
+ LOG.error(String.format("Unknown action in metadata (%s)", action));
+ return Option.empty();
+ }
+ }
+
+ /**
+ * This is method to read selected instants. Do NOT use this directly use one of the helper methods above
+ * If loadInstantDetails is set to true, this would also update 'readCommits' map with commit details
+ * If filter is specified, only the filtered instants are loaded
+ * If commitsFilter is specified, only the filtered records are loaded
+ *
+ * @deprecated Use {@code HoodieArchivedTimeline.loadInstantsV2} instead.
+ */
+ private List loadInstants(
+ HoodieArchivedTimeline.TimeRangeFilter filter,
+ boolean loadInstantDetails,
+ Function commitsFilter) {
+ try {
+ // List all files
+ FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+ new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+
+ // Sort files by version suffix in reverse (implies reverse chronological order)
+ Arrays.sort(fsStatuses, new ArchiveLogVersionComparator());
+
+ Set instantsInRange = new HashSet<>();
+ for (FileStatus fs : fsStatuses) {
+ // Read the archived file
+ try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
+ new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
+ int instantsInPreviousFile = instantsInRange.size();
+ // Read the avro blocks
+ while (reader.hasNext()) {
+ HoodieLogBlock block = reader.next();
+ if (block instanceof HoodieAvroDataBlock) {
+ HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
+ // TODO If we can store additional metadata in datablock, we can skip parsing records
+ // (such as startTime, endTime of records in the block)
+ try (ClosableIterator> itr = avroBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO)) {
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
+ // Filter blocks in desired time window
+ .map(r -> (GenericRecord) r.getData())
+ .filter(commitsFilter::apply)
+ .map(r -> readCommit(r, loadInstantDetails))
+ .filter(c -> filter == null || filter.isInRange(c.getTimestamp()))
+ .forEach(instantsInRange::add);
+ }
+ }
+ }
+
+ if (filter != null) {
+ int instantsInCurrentFile = instantsInRange.size() - instantsInPreviousFile;
+ if (instantsInPreviousFile > 0 && instantsInCurrentFile == 0) {
+ // Note that this is an optimization to skip reading unnecessary archived files
+ // This signals we crossed lower bound of desired time window.
+ break;
+ }
+ }
+ } catch (Exception oriException) {
+ // merge small archive files may leave uncompleted archive file which will cause exception.
+ // need to ignore this kind of exception here.
+ try {
+ if (isCorruptCompactionFile(metaClient, fs)) {
+ continue;
+ }
+ throw oriException;
+ } catch (Exception e) {
+ // If anything wrong during parsing merge archive plan, we need to throw the original exception.
+ // For example corrupted archive file and corrupted plan are both existed.
+ throw oriException;
+ }
+ }
+ }
+
+ ArrayList result = new ArrayList<>(instantsInRange);
+ Collections.sort(result);
+ return result;
+ } catch (IOException e) {
+ throw new HoodieIOException(
+ "Could not load archived commit timeline from path " + metaClient.getArchivePath(), e);
+ }
+ }
+
+ public static boolean isCorruptCompactionFile(HoodieTableMetaClient metaClient, FileStatus fileStatus) throws IOException {
+ Path planPath = new Path(metaClient.getArchivePath(), MERGE_ARCHIVE_PLAN_NAME);
+ HoodieWrapperFileSystem fileSystem = metaClient.getFs();
+ if (fileSystem.exists(planPath)) {
+ HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class);
+ String mergedArchiveFileName = plan.getMergedArchiveFileName();
+ if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && fileStatus.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) {
+ LOG.warn("Catch exception because of reading uncompleted merging archive file " + mergedArchiveFileName + ". Ignore it here.");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Sort files by reverse order of version suffix in file name.
+ */
+ public static class ArchiveLogVersionComparator implements Comparator, Serializable {
+ @Override
+ public int compare(FileStatus f1, FileStatus f2) {
+ return Integer.compare(getArchivedFileSuffix(f2), getArchivedFileSuffix(f1));
+ }
+ }
+
+ private static int getArchivedFileSuffix(FileStatus f) {
+ try {
+ Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(f.getPath().getName());
+ if (fileMatcher.matches()) {
+ return Integer.parseInt(fileMatcher.group(1));
+ }
+ } catch (NumberFormatException e) {
+ // log and ignore any format warnings
+ LOG.warn("error getting suffix for archived file: " + f.getPath());
+ }
+ // return default value in case of any errors
+ return 0;
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/InstantTriple.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/InstantTriple.java
new file mode 100644
index 0000000000000..9fc99dabeaf65
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/InstantTriple.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+
+import org.jetbrains.annotations.NotNull;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A triple of instants covering action states: requested, inflight, completed.
+ */
+public class InstantTriple implements Serializable, Comparable {
+ private final HoodieInstant requested;
+ private final HoodieInstant inflight;
+ private final HoodieInstant completed;
+
+ /**
+ * The constructor.
+ */
+ protected InstantTriple(@Nullable HoodieInstant requested, @Nullable HoodieInstant inflight, HoodieInstant completed) {
+ this.requested = requested;
+ this.inflight = inflight;
+ this.completed = completed;
+ }
+
+ public static InstantTriple fromInstants(List instants) {
+ HoodieInstant requested = null;
+ HoodieInstant inflight = null;
+ HoodieInstant completed = null;
+ for (HoodieInstant instant : instants) {
+ if (instant.isRequested()) {
+ requested = instant;
+ } else if (instant.isInflight()) {
+ inflight = instant;
+ } else {
+ completed = instant;
+ }
+ }
+ return new InstantTriple(requested, inflight, Objects.requireNonNull(completed));
+ }
+
+ public HoodieInstant getRequested() {
+ return requested;
+ }
+
+ public HoodieInstant getInflight() {
+ return inflight;
+ }
+
+ public HoodieInstant getCompleted() {
+ return completed;
+ }
+
+ /**
+ * A COMPACTION action eventually becomes COMMIT when completed.
+ */
+ public String getAction() {
+ return this.completed.getAction();
+ }
+
+ public String getInstantTime() {
+ return this.completed.getTimestamp();
+ }
+
+ public String getCompletionTime() {
+ return this.completed.getStateTransitionTime();
+ }
+
+ public Option getCommitMetadata(HoodieTableMetaClient metaClient) {
+ Option content = metaClient.getActiveTimeline().getInstantDetails(this.completed);
+ if (content.isPresent() && content.get().length == 0) {
+ return Option.empty();
+ }
+ return content;
+ }
+
+ public Option getRequestedCommitMetadata(HoodieTableMetaClient metaClient) {
+ if (this.requested != null) {
+ Option requestedContent = metaClient.getActiveTimeline().getInstantDetails(this.requested);
+ if (!requestedContent.isPresent() || requestedContent.get().length == 0) {
+ return Option.empty();
+ } else {
+ return requestedContent;
+ }
+ } else {
+ return Option.empty();
+ }
+ }
+
+ public byte[] getCleanPlan(HoodieTableMetaClient metaClient) {
+ return metaClient.getActiveTimeline().readCleanerInfoAsBytes(getPendingInstant()).get();
+ }
+
+ public byte[] getCompactionPlan(HoodieTableMetaClient metaClient) {
+ return metaClient.getActiveTimeline().readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(getInstantTime())).get();
+ }
+
+ public byte[] getLogCompactionPlan(HoodieTableMetaClient metaClient) {
+ return metaClient.getActiveTimeline().readCompactionPlanAsBytes(HoodieTimeline.getLogCompactionRequestedInstant(getInstantTime())).get();
+ }
+
+ private HoodieInstant getPendingInstant() {
+ if (requested != null) {
+ return requested;
+ } else if (inflight != null) {
+ return inflight;
+ } else {
+ throw new AssertionError("Pending instant does not exist.");
+ }
+ }
+
+ @Override
+ public int compareTo(@NotNull InstantTriple other) {
+ return this.completed.getTimestamp().compareTo(other.completed.getTimestamp());
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index cfd47ab2b374e..8714ba1bf2fe2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -18,9 +18,7 @@
package org.apache.hudi.client.utils;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
+import org.apache.hudi.avro.model.HoodieArchivedInstant;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
@@ -41,6 +39,12 @@
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
/**
* Helper class to convert between different action related payloads and {@link HoodieArchivedMetaEntry}.
*/
@@ -137,6 +141,37 @@ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInst
return archivedMetaWrapper;
}
+ public static HoodieArchivedInstant createArchivedInstant(InstantTriple triple, HoodieTableMetaClient metaClient) {
+ HoodieArchivedInstant archivedInstant = new HoodieArchivedInstant();
+ archivedInstant.setInstantTime(triple.getInstantTime());
+ archivedInstant.setCompletionTime(triple.getCompletionTime());
+ archivedInstant.setAction(triple.getAction());
+ triple.getCommitMetadata(metaClient).ifPresent(commitMetadata -> archivedInstant.setMetadata(ByteBuffer.wrap(commitMetadata)));
+ switch (triple.getAction()) {
+ case HoodieTimeline.CLEAN_ACTION: {
+ archivedInstant.setPlan(ByteBuffer.wrap(triple.getCleanPlan(metaClient)));
+ break;
+ }
+ case HoodieTimeline.REPLACE_COMMIT_ACTION: {
+ // we may have cases with empty HoodieRequestedReplaceMetadata e.g. insert_overwrite_table or insert_overwrite
+ // without clustering. However, we should revisit the requested commit file standardization
+ triple.getRequestedCommitMetadata(metaClient).ifPresent(metadata -> archivedInstant.setPlan(ByteBuffer.wrap(metadata)));
+ break;
+ }
+ case HoodieTimeline.COMPACTION_ACTION: {
+ archivedInstant.setPlan(ByteBuffer.wrap(triple.getCompactionPlan(metaClient)));
+ break;
+ }
+ case HoodieTimeline.LOG_COMPACTION_ACTION: {
+ archivedInstant.setPlan(ByteBuffer.wrap(triple.getLogCompactionPlan(metaClient)));
+ break;
+ }
+ default:
+ // no operation
+ }
+ return archivedInstant;
+ }
+
public static HoodieArchivedMetaEntry createMetaWrapperForEmptyInstant(HoodieInstant hoodieInstant) {
HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
index 23eaa276c5538..c4876d3a0a610 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -94,19 +94,6 @@ public class HoodieArchivalConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("The number of small archive files to be merged at once.");
- public static final ConfigProperty ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
- .key("hoodie.archive.merge.small.file.limit.bytes")
- .defaultValue(20L * 1024 * 1024)
- .markAdvanced()
- .withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.");
-
- public static final ConfigProperty ARCHIVE_MERGE_ENABLE = ConfigProperty
- .key("hoodie.archive.merge.enable")
- .defaultValue(false)
- .markAdvanced()
- .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
- + " useful when storage scheme doesn't support append operation.");
-
public static final ConfigProperty ARCHIVE_BEYOND_SAVEPOINT = ConfigProperty
.key("hoodie.archive.beyond.savepoint")
.defaultValue(false)
@@ -191,16 +178,6 @@ public HoodieArchivalConfig.Builder withArchiveMergeFilesBatchSize(int number) {
return this;
}
- public HoodieArchivalConfig.Builder withArchiveMergeSmallFileLimit(long size) {
- archivalConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size));
- return this;
- }
-
- public HoodieArchivalConfig.Builder withArchiveMergeEnable(boolean enable) {
- archivalConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable));
- return this;
- }
-
public HoodieArchivalConfig.Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
archivalConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 1581e21c070be..f2f30fb3ec08d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1513,18 +1513,10 @@ public boolean isAutoClean() {
return getBoolean(HoodieCleanConfig.AUTO_CLEAN);
}
- public boolean getArchiveMergeEnable() {
- return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
- }
-
public boolean shouldArchiveBeyondSavepoint() {
return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT);
}
- public long getArchiveMergeSmallFileLimitBytes() {
- return getLong(HoodieArchivalConfig.ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES);
- }
-
public boolean isAutoArchive() {
return getBoolean(HoodieArchivalConfig.AUTO_ARCHIVE);
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyInstantTriple.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyInstantTriple.java
new file mode 100644
index 0000000000000..f63043ee418fc
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyInstantTriple.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.client.utils.InstantTriple;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+/**
+ * Instant triple for testing.
+ */
+public class DummyInstantTriple extends InstantTriple {
+ private final byte[] commitMetadata;
+
+ /**
+ * Only for testing purpose.
+ */
+ public DummyInstantTriple(HoodieInstant completed, byte[] commitMetadata) {
+ super(new HoodieInstant(HoodieInstant.State.REQUESTED, completed.getAction(), completed.getTimestamp()),
+ new HoodieInstant(HoodieInstant.State.INFLIGHT, completed.getAction(), completed.getTimestamp()),
+ completed);
+ this.commitMetadata = commitMetadata;
+ }
+
+ @Override
+ public Option getCommitMetadata(HoodieTableMetaClient metaClient) {
+ return Option.of(this.commitMetadata);
+ }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 5b8f7cd20c71f..7419e0e3a912d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -33,7 +33,6 @@
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -62,7 +61,6 @@
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
@@ -105,9 +103,9 @@
import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
import static org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
@@ -135,7 +133,7 @@ public void init(HoodieTableType tableType) throws Exception {
hadoopConf.addResource(wrapperFs.getConf());
}
- private void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) throws IOException {
+ private void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) {
if (enableMetadataTable) {
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, writeConfig, context);
// reload because table configs could have been updated
@@ -162,7 +160,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits,
- maxDeltaCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200,
+ maxDeltaCommits, maxDeltaCommitsMetadataTable, tableType, 10,
HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
}
@@ -172,7 +170,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits,
- 5, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200,
+ 5, maxDeltaCommitsMetadataTable, tableType, 10,
HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
}
@@ -180,11 +178,9 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int minArchivalCommits,
int maxArchivalCommits,
int maxDeltaCommitsMetadataTable,
- boolean enableArchiveMerge,
- int archiveFilesBatch,
- long size) throws Exception {
+ int archiveFilesBatch) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, 5,
- maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, enableArchiveMerge, archiveFilesBatch, size,
+ maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, archiveFilesBatch,
HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
}
@@ -194,9 +190,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int maxDeltaCommits,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType,
- boolean enableArchiveMerge,
int archiveFilesBatch,
- long size,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
WriteConcurrencyMode writeConcurrencyMode) throws Exception {
return initTestTableAndGetWriteConfig(
@@ -206,9 +200,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
maxDeltaCommits,
maxDeltaCommitsMetadataTable,
tableType,
- enableArchiveMerge,
archiveFilesBatch,
- size,
failedWritesCleaningPolicy,
writeConcurrencyMode,
ARCHIVE_BEYOND_SAVEPOINT.defaultValue());
@@ -220,9 +212,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int maxDeltaCommits,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType,
- boolean enableArchiveMerge,
int archiveFilesBatch,
- long size,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
WriteConcurrencyMode writeConcurrencyMode,
boolean archiveProceedBeyondSavepoints) throws Exception {
@@ -231,9 +221,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).withFailedWritesCleaningPolicy(failedWritesCleaningPolicy).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
- .withArchiveMergeEnable(enableArchiveMerge)
.withArchiveMergeFilesBatchSize(archiveFilesBatch)
- .withArchiveMergeSmallFileLimit(size)
.archiveCommitsWith(minArchivalCommits, maxArchivalCommits)
.withArchiveBeyondSavepoint(archiveProceedBeyondSavepoints).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -454,7 +442,7 @@ protected static HoodieCommitMetadata generateCommitMetadata(
public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception {
boolean enableMetadata = false;
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2, HoodieTableType.COPY_ON_WRITE,
- false, 10, 209715200, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint);
+ 10, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint);
// min archival commits is 2 and max archival commits is 4. and so, after 5th commit, 3 commits will be archived.
for (int i = 1; i < 5; i++) {
@@ -502,10 +490,9 @@ public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exc
getActiveCommitInstants(Arrays.asList("00000008", "00000009")), commitsAfterArchival);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableArchiveMerge) throws Exception {
- HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, enableArchiveMerge, 3, 209715200);
+ @Test
+ public void testCompactionWithCorruptManifestFile() throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 10; i++) {
@@ -513,27 +500,23 @@ public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableA
archiveAndGetCommitsList(writeConfig);
}
- // build a merge small archive plan with dummy content
+ // build a compaction archive plan with dummy content
// this plan can not be deserialized.
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
- FileStatus[] fsStatuses = metaClient.getFs().globStatus(
- new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
- List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
-
- archiver.reOpenWriter();
- Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
- archiver.buildArchiveMergePlan(candidateFiles, plan, ".commits_.archive.3_1-0-1");
- String s = "Dummy Content";
- // stain the current merge plan file.
- FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes()));
-
- // check that damaged plan file will not block archived timeline loading.
+ List files = HoodieArchivedTimeline.latestSnapshotFiles(metaClient);
+ files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
+
+ archiver.updateManifest("dummy.parquet");
+ // remove the version file created by the last manifest update
+ metaClient.getFs().delete(HoodieArchivedTimeline.getVersionFilePath(metaClient, HoodieArchivedTimeline.latestSnapshotVersion(metaClient)));
+
+ // check that invalid manifest file will not block archived timeline loading.
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
- assertEquals(9 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
+ assertEquals(5 * 3 + 4, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
- // trigger several archive after left damaged merge small archive file plan.
+ // trigger several archive with the invalid manifest file.
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
@@ -544,19 +527,18 @@ public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableA
HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();
// check instant number
- assertEquals(18 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
+ assertEquals(4 * 3 + 14, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
- // if there are damaged archive files and damaged plan, hoodie need throw ioe while loading archived timeline.
- Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
- FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
+ // if there are damaged archive files and damaged plan, hoodie can still load correctly.
+ Path damagedFile = new Path(metaClient.getArchivePath(), "300_301_1.parquet");
+ FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of("dummy".getBytes()));
- assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
+ assertDoesNotThrow(() -> metaClient.getArchivedTimeline().reload(), "Archived timeline can skip the invalid data and manifest files smartly");
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testMergeSmallArchiveFilesRecoverFromMergeFailed(boolean enableArchiveMerge) throws Exception {
- HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, enableArchiveMerge, 3, 209715200);
+ @Test
+ public void testCompactionRecoverWithoutManifestFile() throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 10; i++) {
@@ -567,121 +549,36 @@ public void testMergeSmallArchiveFilesRecoverFromMergeFailed(boolean enableArchi
// do a single merge small archive files
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
- FileStatus[] fsStatuses = metaClient.getFs().globStatus(
- new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
- List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
- archiver.reOpenWriter();
+ List candidateFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient);
+ candidateFiles.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
- archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
- archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
- HoodieLogFormat.Writer writer = archiver.reOpenWriter();
+ String compactedFileName = HoodieTimelineArchiver.compactedFileName(candidateFiles);
+ archiver.compactArchiveFiles(candidateFiles, compactedFileName);
// check loading archived and active timeline success
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
- assertEquals(9 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.reload().countInstants());
-
- String s = "Dummy Content";
- // stain the current merged archive file.
- FileIOUtils.createFileInPath(metaClient.getFs(), writer.getLogFile().getPath(), Option.of(s.getBytes()));
-
- // do another archive actions with merge small archive files.
- for (int i = 1; i < 10; i++) {
- testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
- archiveAndGetCommitsList(writeConfig);
- }
-
- // check result.
- // we need to load archived timeline successfully and ignore the parsing damage merged archive files exception.
- HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
- HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();
-
- assertEquals(18 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
-
- // if there are a damaged merged archive files and other common damaged archive file.
- // hoodie need throw ioe while loading archived timeline because of parsing the damaged archive file.
- Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
- FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
-
- assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
+ assertEquals(5 * 3 + 4, rawActiveTimeline.countInstants() + archivedTimeLine.reload().countInstants());
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testMergeSmallArchiveFilesRecoverFromDeleteFailed(boolean enableArchiveMerge) throws Exception {
- HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, enableArchiveMerge, 3, 209715200);
+ @Test
+ public void testCompactionCleaning() throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
// do ingestion and trigger archive actions here.
- for (int i = 1; i < 10; i++) {
+ for (int i = 1; i < 19; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}
-
- // do a single merge small archive files
- HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
- HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
- FileStatus[] fsStatuses = metaClient.getFs().globStatus(
- new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
- List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
-
- archiver.reOpenWriter();
-
- archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
- archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
- archiver.reOpenWriter();
-
- // delete only one of the small archive file to simulate delete action failed.
- metaClient.getFs().delete(fsStatuses[0].getPath());
+ // now we have version 6, 7, 8, 9 version of snapshots
// loading archived timeline and active timeline success
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
- assertEquals(9 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
-
- // do another archive actions with merge small archive files.
- for (int i = 1; i < 10; i++) {
- testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
- archiveAndGetCommitsList(writeConfig);
- }
-
- // check result.
- HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
- HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();
+ assertEquals(4 * 3 + 14, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
- assertEquals(18 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLoadArchiveTimelineWithDamagedPlanFile(boolean enableArchiveMerge) throws Exception {
- HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, enableArchiveMerge, 3, 209715200);
-
- // do ingestion and trigger archive actions here.
- int numInstant = 12;
- for (int i = 1; i < 12; i++) {
- testTable.doWriteOperation(
- "000000" + String.format("%02d", i),
- WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(),
- Arrays.asList("p1", "p2"),
- 2);
- archiveAndGetCommitsList(writeConfig);
- }
-
- Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
- String s = "Dummy Content";
- // stain the current merge plan file.
- FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes()));
-
- // check that damaged plan file will not block archived timeline loading.
- HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
- HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
- assertEquals((numInstant - 1) * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
-
- // if there are damaged archive files and damaged plan, hoodie need throw ioe while loading archived timeline.
- Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
- FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
-
- assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
+ assertEquals(9, HoodieArchivedTimeline.latestSnapshotVersion(metaClient));
+ assertEquals(Arrays.asList(6, 7, 8, 9), HoodieArchivedTimeline.allSnapshotVersions(metaClient).stream().sorted().collect(Collectors.toList()));
}
@Test
@@ -696,7 +593,7 @@ public void testArchivalWithMultiWriters() throws Exception {
private void testArchivalWithMultiWriters(boolean enableMetadata) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 4, 5, 5, 2,
- HoodieTableType.COPY_ON_WRITE, false, 10, 209715200,
+ HoodieTableType.COPY_ON_WRITE, 10,
HoodieFailedWritesCleaningPolicy.LAZY, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL);
final ExecutorService executors = Executors.newFixedThreadPool(2);
@@ -765,45 +662,6 @@ public static CompletableFuture allOfTerminateOnFailure(List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
-
- archiver.reOpenWriter();
-
- archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
- archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
- HoodieLogFormat.Writer writer = archiver.reOpenWriter();
-
- String s = "Dummy Content";
- // stain the current merged archive file.
- FileIOUtils.createFileInPath(metaClient.getFs(), writer.getLogFile().getPath(), Option.of(s.getBytes()));
-
- // if there's only a damaged merged archive file, we need to ignore the exception while reading this damaged file.
- HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
- HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline();
-
- assertEquals(9 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
-
- // if there are a damaged merged archive files and other common damaged archive file.
- // hoodie need throw ioe while loading archived timeline because of parsing the damaged archive file.
- Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
- FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
-
- assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
- }
-
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testNoArchivalUntilMaxArchiveConfigWithExtraInflightCommits(boolean enableMetadata) throws Exception {
@@ -1013,7 +871,6 @@ public void testNoArchivalWithInflightCompactionInMiddle(boolean enableMetadata)
List commitsAfterArchival = commitsList.getValue();
List archivedInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000003", "00000004"), HoodieTimeline.DELTA_COMMIT_ACTION);
- archivedInstants.add(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "00000002"));
archivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000002"));
verifyArchival(archivedInstants,
getActiveCommitInstants(Arrays.asList("00000005", "00000006", "00000007", "00000008"), HoodieTimeline.DELTA_COMMIT_ACTION),
@@ -1138,7 +995,7 @@ public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exce
} else {
if (i == 8) {
// when i == 7 compaction in metadata table will be triggered
- // and afterwards archival in datatable will kick in when i == 8.
+ // and after wards archival in datatable will kick in when i == 8.
// 1,2,3,4,5,6,7,8 : after archival -> 1,4,5,6,7,8 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival)
List expectedActiveInstants = new ArrayList<>();
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001", "00000008")));
@@ -1210,15 +1067,11 @@ public void testArchiveCompletedRollbackAndClean(boolean isEmpty, boolean enable
List expectedArchivedInstants = new ArrayList<>();
for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) {
createCleanMetadata(startInstant + "", false, false, isEmpty || i % 2 == 0);
- expectedArchivedInstants.add(new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startInstant + ""));
- expectedArchivedInstants.add(new HoodieInstant(State.INFLIGHT, HoodieTimeline.CLEAN_ACTION, startInstant + ""));
expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, startInstant + ""));
}
for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) {
createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", false, isEmpty || i % 2 == 0);
- expectedArchivedInstants.add(new HoodieInstant(State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, startInstant + ""));
- expectedArchivedInstants.add(new HoodieInstant(State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, startInstant + ""));
expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.ROLLBACK_ACTION, startInstant + ""));
}
@@ -1772,24 +1625,13 @@ private void verifyArchival(List expectedArchivedInstants, List getArchivedInstants(HoodieInstant instant) {
- List instants = new ArrayList<>();
- if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) || instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)
- || instant.getAction().equals(HoodieTimeline.CLEAN_ACTION) || instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
- instants.add(new HoodieInstant(State.REQUESTED, instant.getAction(), instant.getTimestamp()));
- }
- instants.add(new HoodieInstant(State.INFLIGHT, instant.getAction(), instant.getTimestamp()));
- instants.add(new HoodieInstant(State.COMPLETED, instant.getAction(), instant.getTimestamp()));
- return instants;
- }
-
private List getAllArchivedCommitInstants(List commitTimes) {
return getAllArchivedCommitInstants(commitTimes, HoodieTimeline.COMMIT_ACTION);
}
private List getAllArchivedCommitInstants(List commitTimes, String action) {
List allInstants = new ArrayList<>();
- commitTimes.forEach(entry -> allInstants.addAll(getArchivedInstants(new HoodieInstant(State.COMPLETED, action, entry))));
+ commitTimes.forEach(commitTime -> allInstants.add(new HoodieInstant(State.COMPLETED, action, commitTime)));
return allInstants;
}
diff --git a/hudi-common/src/main/avro/HoodieArchivedInstant.avsc b/hudi-common/src/main/avro/HoodieArchivedInstant.avsc
new file mode 100644
index 0000000000000..9048d7d4f8ccf
--- /dev/null
+++ b/hudi-common/src/main/avro/HoodieArchivedInstant.avsc
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "type":"record",
+ "name":"HoodieArchivedInstant",
+ "namespace":"org.apache.hudi.avro.model",
+ "fields":[
+ {
+ "name":"instantTime",
+ "type":["null","string"],
+ "default": null
+ },
+ {
+ "name":"completionTime",
+ "type":["null","string"],
+ "default": null
+ },
+ {
+ "name":"action",
+ "type":["null","string"],
+ "default": null
+ },
+ {
+ "name":"metadata",
+ "type":["null", "bytes"],
+ "default": null
+ },
+ {
+ "name":"plan",
+ "type":["null", "bytes"],
+ "default": null
+ }
+ ]
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index afb590e9cc045..862a712ea67a3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -983,7 +983,7 @@ public PropertyBuilder set(Map props) {
public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) {
return setTableType(metaClient.getTableType())
.setTableName(metaClient.getTableConfig().getTableName())
- .setArchiveLogFolder(metaClient.getArchivePath())
+ .setArchiveLogFolder(metaClient.getTableConfig().getArchivelogFolder())
.setPayloadClassName(metaClient.getTableConfig().getPayloadClass())
.setRecordMergerStrategy(metaClient.getTableConfig().getRecordMergerStrategy());
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index dbfe484531aa4..2538aa9d87626 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -366,7 +366,7 @@ public Option readRollbackInfoAsBytes(HoodieInstant instant) {
public Option readRestoreInfoAsBytes(HoodieInstant instant) {
// Rollback metadata are always stored only in timeline .hoodie
- return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
+ return readDataFromPath(getInstantFileNamePath(instant.getFileName()));
}
//-----------------------------------------------------------------
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index eb4dc631ed602..8a8a15a3e16bd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -18,75 +18,125 @@
package org.apache.hudi.common.table.timeline;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
-import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
-import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.avro.model.HoodieArchivedInstant;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.ArchivedMetaEntryReadSchemas;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Spliterator;
-import java.util.Spliterators;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.stream.StreamSupport;
+import java.util.stream.Collectors;
/**
- * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
- * ActiveTimeline and the rest are in ArchivedTimeline.
- *
- *
- * Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call
- * reload()
- *
- *
- * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized.
+ * Represents the Archived Timeline for the Hoodie table.
+ *
+ * After several instants are accumulated as a batch on the active timeline, they would be archived as a parquet file into the archived timeline.
+ * In general the archived timeline is comprised with parquet files with LSM style file layout. Each new operation to the archived timeline generates
+ * a new snapshot version. Theoretically, there could be multiple snapshot versions on the archived timeline.
+ *
+ *
The Archived Timeline Layout
+ *
+ *
+ * t111, t112 ... t120 ... ->
+ * \ /
+ * \ /
+ * |
+ * V
+ * t111_t120_0.parquet, t101_t110_0.parquet,... t11_t20_0.parquet L0
+ * \ /
+ * \ /
+ * |
+ * V
+ * t11_t100_1.parquet L1
+ *
+ * manifest_1, manifest_2, ... manifest_12
+ * | | |
+ * V V V
+ * _version_1, _version_2, ... _version_12
+ *
+ *
+ * The LSM Tree Compaction
+ * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer.
+ * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1,
+ * so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read.
+ *
+ * The benchmark shows 1000 instants read cost about 10 ms.
+ *
+ *
The Archiver & Reader Snapshot Isolation
+ *
+ * In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management:
+ *
+ * - Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;
+ * - Version file: A version file is generated right after a complete manifest file is formed.
+ *
+ *
+ * The Reader Workflow
+ *
+ * - read the latest version;
+ * - read the manifest file for valid file handles;
+ * - probably do a data skipping with the parquet file name max min timestamp.
+ *
+ *
+ * The Legacy Files Cleaning and Read Retention
+ * Only triggers file cleaning after a valid compaction.
+ *
+ * Clean Strategy
+ * Keeps only 3 valid snapshot versions for the reader, that means, a file is kept for at lest 3 archival trigger interval, for default configuration, it is 30 instants time span,
+ * which is far longer that the archived timeline loading time.
+ *
+ * Instants TTL
+ * The timeline reader only reads instants of last 7 days. We will skip the instants from archived timeline that are generated 1 week ago.
+ *
+ * Timeline Refresh
+ * Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call
+ * #reload().
+ *
+ *
This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized.
*/
public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
- public static final String MERGE_ARCHIVE_PLAN_NAME = "mergeArchivePlan";
+ private static final String VERSION_FILE_PREFIX = "_version_"; // _version_[N]
+ private static final String MANIFEST_FILE_PREFIX = "manifest_"; // manifest_[N]
+ public static final int FILE_LAYER_ZERO = 0;
private static final Pattern ARCHIVE_FILE_PATTERN =
- Pattern.compile("^\\.commits_\\.archive\\.([0-9]+).*");
-
- private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = "commits";
- private static final String ACTION_TYPE_KEY = "actionType";
- private static final String ACTION_STATE = "actionState";
- private static final String STATE_TRANSITION_TIME = "stateTransitionTime";
+ Pattern.compile("^(\\d+)_(\\d+)_(\\d)\\.parquet");
+ private static final String INSTANT_TIME_ARCHIVED_META_FIELD = "instantTime";
+ private static final String COMPLETION_TIME_ARCHIVED_META_FIELD = "completionTime";
+ private static final String ACTION_ARCHIVED_META_FIELD = "action";
+ private static final String METADATA_ARCHIVED_META_FIELD = "metadata";
+ private static final String PLAN_ARCHIVED_META_FIELD = "plan";
private HoodieTableMetaClient metaClient;
- private final Map readCommits = new HashMap<>();
+ private final Map readCommits = new ConcurrentHashMap<>();
private static final Logger LOG = LoggerFactory.getLogger(HoodieArchivedTimeline.class);
@@ -97,7 +147,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
*/
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
- setInstants(this.loadInstants(false));
+ setInstants(this.loadInstants());
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function> & Serializable) this::getInstantDetails;
@@ -109,8 +159,7 @@ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
*/
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String startTs) {
this.metaClient = metaClient;
- setInstants(loadInstants(new StartTsFilter(startTs), true,
- record -> HoodieInstant.State.COMPLETED.toString().equals(record.get(ACTION_STATE).toString())));
+ setInstants(loadInstants(new StartTsFilter(startTs), LoadMode.COMMIT_META));
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function> & Serializable) this::getInstantDetails;
@@ -133,21 +182,12 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN
in.defaultReadObject();
}
- public static Path getArchiveLogPath(String archiveFolder) {
- return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX);
- }
-
public void loadInstantDetailsInMemory(String startTs, String endTs) {
loadInstants(startTs, endTs);
}
public void loadCompletedInstantDetailsInMemory() {
- loadInstants(null, true,
- record -> {
- // Very old archived instants don't have action state set.
- Object action = record.get(ACTION_STATE);
- return action == null || HoodieInstant.State.COMPLETED.toString().equals(action.toString());
- });
+ loadInstants(null, LoadMode.COMMIT_META);
}
public void loadCompactionDetailsInMemory(String compactionInstantTime) {
@@ -156,13 +196,9 @@ public void loadCompactionDetailsInMemory(String compactionInstantTime) {
public void loadCompactionDetailsInMemory(String startTs, String endTs) {
// load compactionPlan
- loadInstants(new TimeRangeFilter(startTs, endTs), true,
- record -> {
- // Older files don't have action state set.
- Object action = record.get(ACTION_STATE);
- return record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION)
- && (action == null || HoodieInstant.State.INFLIGHT.toString().equals(action.toString()));
- }
+ loadInstants(new TimeRangeFilter(startTs, endTs), LoadMode.PLAN,
+ record -> record.get(ACTION_ARCHIVED_META_FIELD).toString().equals(HoodieTimeline.COMMIT_ACTION)
+ && record.get(PLAN_ARCHIVED_META_FIELD) != null
);
}
@@ -184,146 +220,306 @@ public HoodieArchivedTimeline reload() {
return new HoodieArchivedTimeline(metaClient);
}
- private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
- final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
- final String action = record.get(ACTION_TYPE_KEY).toString();
- final String stateTransitionTime = (String) record.get(STATE_TRANSITION_TIME);
- if (loadDetails) {
- getMetadataKey(action).map(key -> {
- Object actionData = record.get(key);
- if (actionData != null) {
- if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
- this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord) actionData));
- } else {
- this.readCommits.put(instantTime, actionData.toString().getBytes(StandardCharsets.UTF_8));
- }
+ private HoodieInstant readCommit(GenericRecord record, LoadMode loadMode) {
+ final String instantTime = record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
+ final String action = record.get(ACTION_ARCHIVED_META_FIELD).toString();
+ final String completionTime = (String) record.get(COMPLETION_TIME_ARCHIVED_META_FIELD);
+ loadInstantDetails(record, instantTime, loadMode);
+ return new HoodieInstant(HoodieInstant.State.COMPLETED, action, instantTime, completionTime);
+ }
+
+ private void loadInstantDetails(GenericRecord record, String instantTime, LoadMode loadMode) {
+ switch (loadMode) {
+ case COMMIT_META:
+ ByteBuffer commitMeta = (ByteBuffer) record.get(METADATA_ARCHIVED_META_FIELD);
+ if (commitMeta != null) {
+ // in case the entry comes from an empty completed meta file
+ this.readCommits.put(instantTime, commitMeta.array());
}
- return null;
- });
- }
- return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action,
- instantTime, stateTransitionTime);
- }
-
- @Nonnull
- private Option getMetadataKey(String action) {
- switch (action) {
- case HoodieTimeline.CLEAN_ACTION:
- return Option.of("hoodieCleanMetadata");
- case HoodieTimeline.COMMIT_ACTION:
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- return Option.of("hoodieCommitMetadata");
- case HoodieTimeline.ROLLBACK_ACTION:
- return Option.of("hoodieRollbackMetadata");
- case HoodieTimeline.SAVEPOINT_ACTION:
- return Option.of("hoodieSavePointMetadata");
- case HoodieTimeline.COMPACTION_ACTION:
- case HoodieTimeline.LOG_COMPACTION_ACTION:
- return Option.of("hoodieCompactionPlan");
- case HoodieTimeline.REPLACE_COMMIT_ACTION:
- return Option.of("hoodieReplaceCommitMetadata");
- case HoodieTimeline.INDEXING_ACTION:
- return Option.of("hoodieIndexCommitMetadata");
+ break;
+ case PLAN:
+ ByteBuffer plan = (ByteBuffer) record.get(PLAN_ARCHIVED_META_FIELD);
+ if (plan != null) {
+ // in case the entry comes from an empty completed meta file
+ this.readCommits.put(instantTime, plan.array());
+ }
+ break;
default:
- LOG.error(String.format("Unknown action in metadata (%s)", action));
- return Option.empty();
+ // no operation
}
}
- private List loadInstants(boolean loadInstantDetails) {
- return loadInstants(null, loadInstantDetails);
+ private List loadInstants() {
+ return loadInstants(null, LoadMode.SHIM);
}
private List loadInstants(String startTs, String endTs) {
- return loadInstants(new TimeRangeFilter(startTs, endTs), true);
+ return loadInstants(new TimeRangeFilter(startTs, endTs), LoadMode.COMMIT_META);
}
- private List loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) {
- return loadInstants(filter, loadInstantDetails, record -> true);
+ private List loadInstants(TimeRangeFilter filter, LoadMode loadMode) {
+ return loadInstants(filter, loadMode, r -> true);
}
/**
* This is method to read selected instants. Do NOT use this directly use one of the helper methods above
* If loadInstantDetails is set to true, this would also update 'readCommits' map with commit details
* If filter is specified, only the filtered instants are loaded
- * If commitsFilter is specified, only the filtered records are loaded
+ * If commitsFilter is specified, only the filtered records are loaded.
*/
- private List loadInstants(TimeRangeFilter filter, boolean loadInstantDetails,
- Function commitsFilter) {
+ private List loadInstants(
+ @Nullable TimeRangeFilter filter,
+ LoadMode loadMode,
+ Function commitsFilter) {
try {
// List all files
- FileStatus[] fsStatuses = metaClient.getFs().globStatus(
- new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
-
- // Sort files by version suffix in reverse (implies reverse chronological order)
- Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
-
- Set instantsInRange = new HashSet<>();
- for (FileStatus fs : fsStatuses) {
- // Read the archived file
- try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
- new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
- int instantsInPreviousFile = instantsInRange.size();
- // Read the avro blocks
- while (reader.hasNext()) {
- HoodieLogBlock block = reader.next();
- if (block instanceof HoodieAvroDataBlock) {
- HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
- // TODO If we can store additional metadata in datablock, we can skip parsing records
- // (such as startTime, endTime of records in the block)
- try (ClosableIterator> itr = avroBlock.getRecordIterator(HoodieRecordType.AVRO)) {
- StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
- // Filter blocks in desired time window
- .map(r -> (GenericRecord) r.getData())
- .filter(commitsFilter::apply)
- .map(r -> readCommit(r, loadInstantDetails))
- .filter(c -> filter == null || filter.isInRange(c))
- .forEach(instantsInRange::add);
- }
- }
- }
-
- if (filter != null) {
- int instantsInCurrentFile = instantsInRange.size() - instantsInPreviousFile;
- if (instantsInPreviousFile > 0 && instantsInCurrentFile == 0) {
- // Note that this is an optimization to skip reading unnecessary archived files
- // This signals we crossed lower bound of desired time window.
- break;
- }
- }
- } catch (Exception originalException) {
- // merge small archive files may left uncompleted archive file which will cause exception.
- // need to ignore this kind of exception here.
- try {
- Path planPath = new Path(metaClient.getArchivePath(), MERGE_ARCHIVE_PLAN_NAME);
- HoodieWrapperFileSystem fileSystem = metaClient.getFs();
- if (fileSystem.exists(planPath)) {
- HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class);
- String mergedArchiveFileName = plan.getMergedArchiveFileName();
- if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && fs.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) {
- LOG.warn("Catch exception because of reading uncompleted merging archive file " + mergedArchiveFileName + ". Ignore it here.");
- continue;
- }
- }
- throw originalException;
- } catch (Exception e) {
- // If anything wrong during parsing merge archive plan, we need to throw the original exception.
- // For example corrupted archive file and corrupted plan are both existed.
- throw originalException;
- }
- }
- }
-
- ArrayList result = new ArrayList<>(instantsInRange);
+ List fileNames = latestSnapshotFiles(metaClient);
+
+ Map instantsInRange = new ConcurrentHashMap<>();
+ Schema readSchema = getReadSchema(loadMode);
+ fileNames.stream()
+ .filter(fileName -> filter == null || isFileInRange(filter, fileName))
+ .parallel().forEach(fileName -> {
+ // Read the archived file
+ try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ .getFileReader(metaClient.getHadoopConf(), new Path(metaClient.getArchivePath(), fileName))) {
+ try (ClosableIterator iterator = reader.getIndexedRecordIterator(HoodieArchivedInstant.getClassSchema(), readSchema)) {
+ while (iterator.hasNext()) {
+ GenericRecord record = (GenericRecord) iterator.next();
+ String instantTime = record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
+ if (!instantsInRange.containsKey(instantTime)
+ && (filter == null || filter.isInRange(instantTime))
+ && commitsFilter.apply(record)) {
+ HoodieInstant instant = readCommit(record, loadMode);
+ instantsInRange.put(instantTime, instant);
+ }
+ }
+ }
+ } catch (IOException ioException) {
+ throw new HoodieIOException("Error open file reader for path: " + new Path(metaClient.getArchivePath(), fileName));
+ }
+ });
+
+ ArrayList result = new ArrayList<>(instantsInRange.values());
Collections.sort(result);
return result;
} catch (IOException e) {
throw new HoodieIOException(
- "Could not load archived commit timeline from path " + metaClient.getArchivePath(), e);
+ "Could not load archived commit timeline from path " + metaClient.getArchivePath(), e);
+ }
+ }
+
+ @Override
+ public HoodieDefaultTimeline getWriteTimeline() {
+ // filter in-memory instants
+ Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
+ return new HoodieDefaultTimeline(getInstantsAsStream().filter(i ->
+ readCommits.containsKey(i.getTimestamp()))
+ .filter(s -> validActions.contains(s.getAction())), details);
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+ private static Schema getReadSchema(LoadMode loadMode) {
+ switch (loadMode) {
+ case SHIM:
+ return ArchivedMetaEntryReadSchemas.SHIM_SCHEMA;
+ case COMMIT_META:
+ return ArchivedMetaEntryReadSchemas.SCHEMA_WITH_COMMIT_META;
+ case PLAN:
+ return ArchivedMetaEntryReadSchemas.SCHEMA_WITH_PLAN;
+ default:
+ throw new AssertionError("Unexpected");
+ }
+ }
+
+ /**
+ * Returns whether the given file is located in the filter.
+ */
+ private static boolean isFileInRange(TimeRangeFilter filter, String fileName) {
+ String minInstant = getMinInstantTime(fileName);
+ String maxInstant = getMaxInstantTime(fileName);
+ return filter.isInRange(minInstant) || filter.isInRange(maxInstant);
+ }
+
+ /**
+ * Returns the latest snapshot version.
+ */
+ public static int latestSnapshotVersion(HoodieTableMetaClient metaClient) throws IOException {
+ return allSnapshotVersions(metaClient).stream().max(Integer::compareTo).orElse(-1);
+ }
+
+ /**
+ * Returns all the valid snapshot versions.
+ */
+ public static List allSnapshotVersions(HoodieTableMetaClient metaClient) throws IOException {
+ return Arrays.stream(metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getVersionFilePathFilter()))
+ .map(fileStatus -> fileStatus.getPath().getName())
+ .map(HoodieArchivedTimeline::getSnapshotVersion)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Returns the latest snapshot metadata files.
+ */
+ public static List latestSnapshotFiles(HoodieTableMetaClient metaClient) throws IOException {
+ int latestVersion = latestSnapshotVersion(metaClient);
+ return latestSnapshotFiles(metaClient, latestVersion);
+ }
+
+ /**
+ * Reads the file list from the manifest file for the latest snapshot.
+ */
+ public static List latestSnapshotFiles(HoodieTableMetaClient metaClient, int latestVersion) {
+ if (latestVersion < 0) {
+ // there is no valid snapshot of the timeline.
+ return Collections.emptyList();
+ }
+ // read and deserialize the valid files.
+ byte[] content = FileIOUtils.readDataFromPath(metaClient.getFs(), getManifestFilePath(metaClient, latestVersion)).get();
+ return Arrays.stream(new String(content).split(",")).collect(Collectors.toList());
+ }
+
+ /**
+ * Returns the full manifest file path with given version number.
+ */
+ public static Path getManifestFilePath(HoodieTableMetaClient metaClient, int snapshotVersion) {
+ return new Path(metaClient.getArchivePath(), MANIFEST_FILE_PREFIX + snapshotVersion);
+ }
+
+ /**
+ * Returns the full version file path with given version number.
+ */
+ public static Path getVersionFilePath(HoodieTableMetaClient metaClient, int snapshotVersion) {
+ return new Path(metaClient.getArchivePath(), VERSION_FILE_PREFIX + snapshotVersion);
+ }
+
+ /**
+ * List all the version files.
+ */
+ public static FileStatus[] listAllVersionFiles(HoodieTableMetaClient metaClient) throws IOException {
+ return metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getVersionFilePathFilter());
+ }
+
+ /**
+ * List all the parquet manifest files.
+ */
+ public static FileStatus[] listAllManifestFiles(HoodieTableMetaClient metaClient) throws IOException {
+ return metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getManifestFilePathFilter());
+ }
+
+ /**
+ * List all the parquet metadata files.
+ */
+ public static FileStatus[] listAllMetaFiles(HoodieTableMetaClient metaClient) throws IOException {
+ return metaClient.getFs().globStatus(
+ new Path(metaClient.getArchivePath() + "/*.parquet"));
+ }
+
+ /**
+ * Parse the snapshot version from the version file name.
+ */
+ public static int getSnapshotVersion(String fileName) {
+ return Integer.parseInt(fileName.split("_")[2]);
+ }
+
+ /**
+ * Parse the snapshot version from the manifest file name.
+ */
+ public static int getManifestVersion(String fileName) {
+ return Integer.parseInt(fileName.split("_")[1]);
+ }
+
+ /**
+ * Parse the layer number from the file name.
+ */
+ public static int getFileLayer(String fileName) {
+ try {
+ Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName);
+ if (fileMatcher.matches()) {
+ return Integer.parseInt(fileMatcher.group(3));
+ }
+ } catch (NumberFormatException e) {
+ // log and ignore any format warnings
+ LOG.warn("error getting file layout for archived file: " + fileName);
+ }
+
+ // return default value in case of any errors
+ return FILE_LAYER_ZERO;
+ }
+
+ /**
+ * Parse the minimum instant time from the file name.
+ */
+ public static String getMinInstantTime(String fileName) {
+ Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName);
+ if (fileMatcher.matches()) {
+ return fileMatcher.group(1);
+ } else {
+ throw new HoodieException("Unexpected archival file name: " + fileName);
+ }
+ }
+
+ /**
+ * Parse the maximum instant time from the file name.
+ */
+ public static String getMaxInstantTime(String fileName) {
+ Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName);
+ if (fileMatcher.matches()) {
+ return fileMatcher.group(2);
+ } else {
+ throw new HoodieException("Unexpected archival file name: " + fileName);
}
}
- private static class TimeRangeFilter {
+ /**
+ * Returns whether a file belongs to the specified layer {@code layer} within the LSM layout.
+ */
+ public static boolean isFileFromLayer(String fileName, int layer) {
+ return getFileLayer(fileName) == layer;
+ }
+
+ /**
+ * Returns a path filter for the version pointer files.
+ */
+ public static PathFilter getVersionFilePathFilter() {
+ return path -> path.getName().startsWith(VERSION_FILE_PREFIX);
+ }
+
+ /**
+ * Returns a path filter for the manifest files.
+ */
+ public static PathFilter getManifestFilePathFilter() {
+ return path -> path.getName().startsWith(MANIFEST_FILE_PREFIX);
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ /**
+ * Different mode for loading the archived instant metadata.
+ */
+ private enum LoadMode {
+ /**
+ * Loads the instantTime, completionTime, action.
+ */
+ SHIM,
+ /**
+ * Loads the instantTime, completionTime, action, metadata.
+ */
+ COMMIT_META,
+ /**
+ * Loads the instantTime, completionTime, plan.
+ */
+ PLAN
+ }
+
+ /**
+ * A time based filter with range [startTs, endTs].
+ */
+ public static class TimeRangeFilter {
private final String startTs;
private final String endTs;
@@ -332,11 +528,14 @@ public TimeRangeFilter(String startTs, String endTs) {
this.endTs = endTs;
}
- public boolean isInRange(HoodieInstant instant) {
- return HoodieTimeline.isInRange(instant.getTimestamp(), this.startTs, this.endTs);
+ public boolean isInRange(String instantTime) {
+ return HoodieTimeline.isInRange(instantTime, this.startTs, this.endTs);
}
}
+ /**
+ * A time based filter with range [startTs, +∞).
+ */
private static class StartTsFilter extends TimeRangeFilter {
private final String startTs;
@@ -345,42 +544,18 @@ public StartTsFilter(String startTs) {
this.startTs = startTs;
}
- public boolean isInRange(HoodieInstant instant) {
- return HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN_OR_EQUALS, startTs);
+ public boolean isInRange(String instantTime) {
+ return HoodieTimeline.compareTimestamps(instantTime, GREATER_THAN_OR_EQUALS, startTs);
}
}
/**
- * Sort files by reverse order of version suffix in file name.
+ * Sort files by order of min instant time in file name.
*/
- public static class ArchiveFileVersionComparator implements Comparator, Serializable {
+ public static class ArchiveParquetVersionComparator implements Comparator, Serializable {
@Override
- public int compare(FileStatus f1, FileStatus f2) {
- return Integer.compare(getArchivedFileSuffix(f2), getArchivedFileSuffix(f1));
+ public int compare(String f1, String f2) {
+ return f1.compareTo(f2);
}
-
- private int getArchivedFileSuffix(FileStatus f) {
- try {
- Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(f.getPath().getName());
- if (fileMatcher.matches()) {
- return Integer.parseInt(fileMatcher.group(1));
- }
- } catch (NumberFormatException e) {
- // log and ignore any format warnings
- LOG.warn("error getting suffix for archived file: " + f.getPath());
- }
-
- // return default value in case of any errors
- return 0;
- }
- }
-
- @Override
- public HoodieDefaultTimeline getWriteTimeline() {
- // filter in-memory instants
- Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
- return new HoodieDefaultTimeline(getInstantsAsStream().filter(i ->
- readCommits.containsKey(i.getTimestamp()))
- .filter(s -> validActions.contains(s.getAction())), details);
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index 901530b11d6ed..787740f6c15ee 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -49,7 +49,7 @@ public class HoodieInstant implements Serializable, Comparable {
/**
* A COMPACTION action eventually becomes COMMIT when completed. So, when grouping instants
- * for state transitions, this needs to be taken into account
+ * for state transitions, this needs to be taken into account.
*/
private static final Map COMPARABLE_ACTIONS = createComparableActionsMap();
@@ -240,7 +240,7 @@ public String getFileName() {
throw new IllegalArgumentException("Cannot get file name for unknown action " + action);
}
- private static final Map createComparableActionsMap() {
+ private static Map createComparableActionsMap() {
Map comparableMap = new HashMap<>();
comparableMap.put(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.COMMIT_ACTION);
comparableMap.put(HoodieTimeline.LOG_COMPACTION_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedMetaEntryReadSchemas.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedMetaEntryReadSchemas.java
new file mode 100644
index 0000000000000..919e2af765140
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedMetaEntryReadSchemas.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.avro.Schema;
+
+/**
+ * Avro schema for different read cases.
+ */
+public abstract class ArchivedMetaEntryReadSchemas {
+ public static final Schema SHIM_SCHEMA = new Schema.Parser().parse("{\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"HoodieArchivedMetaEntryV2\",\n"
+ + " \"namespace\":\"org.apache.hudi.avro.model\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"instantTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"completionTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"action\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " }\n"
+ + " ]\n"
+ + "}");
+
+ public static final Schema SCHEMA_WITH_COMMIT_META = new Schema.Parser().parse("{\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"HoodieArchivedMetaEntryV2\",\n"
+ + " \"namespace\":\"org.apache.hudi.avro.model\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"instantTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"completionTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"action\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"metadata\",\n"
+ + " \"type\":[\"null\", \"bytes\"],\n"
+ + " \"default\": null\n"
+ + " }\n"
+ + " ]\n"
+ + "}");
+
+ public static final Schema SCHEMA_WITH_PLAN = new Schema.Parser().parse("{\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"HoodieArchivedMetaEntryV2\",\n"
+ + " \"namespace\":\"org.apache.hudi.avro.model\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"instantTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"completionTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"action\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"plan\",\n"
+ + " \"type\":[\"null\", \"bytes\"],\n"
+ + " \"default\": null\n"
+ + " }\n"
+ + " ]\n"
+ + "}");
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
index af65bac055c30..b15ce11fd530e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
@@ -18,13 +18,14 @@
package org.apache.hudi.io.storage;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
import java.io.IOException;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
@@ -44,5 +45,5 @@ protected ClosableIterator getIndexedRecordIterator(Schema reader
return getIndexedRecordIterator(readerSchema, readerSchema);
}
- protected abstract ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
+ public abstract ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
index 3d6533a342919..26a4001039ec8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
@@ -192,7 +192,7 @@ public Set filterRowKeys(Set candidateRowKeys) {
}
@Override
- protected ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) {
+ public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) {
if (!Objects.equals(readerSchema, requestedSchema)) {
throw new UnsupportedOperationException("Schema projections are not supported in HFile reader");
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
index 1420424a58b01..00ba9fc3bb065 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
@@ -72,7 +72,7 @@ public Set filterRowKeys(Set candidateRowKeys) {
}
@Override
- protected ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) {
+ public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) {
if (!Objects.equals(readerSchema, requestedSchema)) {
throw new UnsupportedOperationException("Schema projections are not supported in HFile reader");
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index ad4d1f16a60ce..3dd070fa0a7f3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -95,7 +95,7 @@ protected ClosableIterator getIndexedRecordIterator(Schema schema
}
@Override
- protected ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+ public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
return getIndexedRecordIteratorInternal(readerSchema, Option.of(requestedSchema));
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
index d0f2ef025102f..4370d7b9c6722 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
@@ -26,7 +26,7 @@
import java.io.IOException;
import java.util.Properties;
-public interface HoodieFileWriter {
+public interface HoodieFileWriter extends AutoCloseable {
boolean canWrite();
void writeWithMetadata(HoodieKey key, HoodieRecord record, Schema schema, Properties props) throws IOException;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 49095683a2bb5..97450086a3ed2 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -230,6 +230,15 @@ public HoodieTestTable addSavepointCommit(String instantTime, HoodieSavepointMet
return this;
}
+ public HoodieCommitMetadata createCommitMetadata(String commitTime, WriteOperationType operationType,
+ List partitions, int filesPerPartition, boolean bootstrap) {
+ Map>> partitionToFilesNameLengthMap = getPartitionFiles(partitions,
+ filesPerPartition);
+ HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(operationType,
+ metaClient.getTableType(), commitTime, partitionToFilesNameLengthMap);
+ return createCommitMetadata(operationType, commitTime, testTableState, bootstrap);
+ }
+
public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime,
HoodieTestTableState testTableState) {
String actionType = getCommitActionType(operationType, metaClient.getTableType());
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala
new file mode 100644
index 0000000000000..5ee5dcdff545c
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.DummyInstantTriple
+import org.apache.hudi.client.HoodieTimelineArchiver
+import org.apache.hudi.client.common.HoodieJavaEngineContext
+import org.apache.hudi.client.utils.InstantTriple
+import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieTableType, WriteOperationType}
+import org.apache.hudi.common.table.timeline.{HoodieArchivedTimeline, HoodieInstant}
+import org.apache.hudi.common.testutils.{HoodieTestTable, HoodieTestUtils}
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.index.HoodieIndex.IndexType
+import org.apache.hudi.table.HoodieJavaTable
+import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
+
+import java.util
+import scala.collection.JavaConverters._
+
+object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase {
+
+ /**
+ * Java HotSpot(TM) 64-Bit Server VM 1.8.0_351-b10 on Mac OS X 13.4.1
+ * Apple M2
+ * pref load archived instants: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+ * ------------------------------------------------------------------------------------------------------------------------
+ * read shim instants 18 32 15 0.1 17914.8 1.0X
+ * read instants with commit metadata 19 25 5 0.1 19403.1 0.9X
+ */
+ private def readArchivedInstantsBenchmark(): Unit = {
+ withTempDir(f => {
+ val tableName = "testTable"
+ val tablePath = new Path(f.getCanonicalPath, tableName).toUri.toString
+ val metaClient = HoodieTestUtils.init(new Configuration(), tablePath, HoodieTableType.COPY_ON_WRITE, tableName)
+
+ val writeConfig = HoodieWriteConfig.newBuilder().withPath(tablePath)
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
+ .withMarkersType("DIRECT")
+ .build()
+ val engineContext = new HoodieJavaEngineContext(new Configuration())
+ val archiver = new HoodieTimelineArchiver(writeConfig, HoodieJavaTable.create(writeConfig, engineContext).asInstanceOf[HoodieJavaTable[HoodieAvroPayload]])
+
+ val startTs = System.currentTimeMillis()
+ val startInstant = startTs + 1 + ""
+ val commitsNum = 10000000
+ val batchSize = 2000
+ val instantBuffer = new util.ArrayList[InstantTriple]()
+ for (i <- 1 to commitsNum) {
+ val instantTime = startTs + i + ""
+ val action = if (i % 2 == 0) "delta_commit" else "commit"
+ val instant = new HoodieInstant(HoodieInstant.State.COMPLETED, action, instantTime, instantTime + 1000)
+ val metadata = HoodieTestTable.of(metaClient).createCommitMetadata(instantTime, WriteOperationType.INSERT, util.Arrays.asList("par1", "par2"), 10, false).toJsonString.getBytes()
+ instantBuffer.add(new DummyInstantTriple(instant, metadata))
+ if (i % batchSize == 0) {
+ // archive 10 instants each time
+ archiver.archive(engineContext, instantBuffer)
+ archiver.compactAndClean(engineContext)
+ instantBuffer.clear()
+ }
+ }
+
+ val benchmark = new HoodieBenchmark("pref load archived instants", commitsNum, 3)
+ benchmark.addCase("read shim instants") { _ =>
+ new HoodieArchivedTimeline(metaClient)
+ }
+ benchmark.addCase("read instants with commit metadata") { _ =>
+ new HoodieArchivedTimeline(metaClient, startInstant)
+ }
+ benchmark.run()
+ val totalSize = HoodieArchivedTimeline.latestSnapshotFiles(metaClient).asScala
+ .map(name => metaClient.getFs.getFileStatus(new Path(metaClient.getArchivePath, name)).getLen)
+ .sum
+ println("Total file size in bytes: " + totalSize)
+ })
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ readArchivedInstantsBenchmark()
+ }
+}