diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index c9cebb1b227f6..0a21dd71d2b46 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -174,8 +174,8 @@ public String compactionShowArchived(
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime);
try {
archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime);
- HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeAvroRecordMetadata(
- archivedTimeline.getInstantDetails(instant).get(), HoodieCompactionPlan.getClassSchema());
+ HoodieCompactionPlan compactionPlan =
+ TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition);
} finally {
archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime);
@@ -365,17 +365,10 @@ Function compactionPlanReader(
private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline,
HoodieInstant instant) {
- // filter inflight compaction
- if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())
- && HoodieInstant.State.INFLIGHT.equals(instant.getState())) {
- try {
- return TimelineMetadataUtils.deserializeAvroRecordMetadata(archivedTimeline.getInstantDetails(instant).get(),
- HoodieCompactionPlan.getClassSchema());
- } catch (Exception e) {
- throw new HoodieException(e.getMessage(), e);
- }
- } else {
- return null;
+ try {
+ return TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
+ } catch (Exception e) {
+ throw new HoodieException(e.getMessage(), e);
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 3d41e3011bdd2..71717e2ae37c5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -19,33 +19,23 @@
package org.apache.hudi.client;
-import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
-import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
+import org.apache.hudi.avro.model.HoodieArchivedInstant;
import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.ActiveInstant;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.fs.StorageSchemes;
-import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
@@ -53,11 +43,17 @@
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
@@ -66,19 +62,20 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -105,9 +102,8 @@ public class HoodieTimelineArchiver {
private static final Logger LOG = LoggerFactory.getLogger(HoodieTimelineArchiver.class);
- private final Path archiveFilePath;
private final HoodieWriteConfig config;
- private Writer writer;
+ private HoodieWriteConfig writerConfig;
private final int maxInstantsToKeep;
private final int minInstantsToKeep;
private final HoodieTable table;
@@ -118,7 +114,6 @@ public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable
this.config = config;
this.table = table;
this.metaClient = table.getMetaClient();
- this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
HoodieTimeline completedCommitsTimeline = table.getCompletedCommitsTimeline();
Option latestCommit = completedCommitsTimeline.lastInstant();
@@ -188,40 +183,24 @@ private Option getEarliestCommitToRetain(Option la
return earliestCommitToRetain;
}
- private Writer openWriter() {
- try {
- if (this.writer == null) {
- return HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent())
- .withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
- .withFs(metaClient.getFs()).overBaseCommit("").build();
- } else {
- return this.writer;
- }
- } catch (IOException e) {
- throw new HoodieException("Unable to initialize HoodieLogFormat writer", e);
- }
- }
-
- public Writer reOpenWriter() {
- try {
- if (this.writer != null) {
- this.writer.close();
- this.writer = null;
- }
- this.writer = openWriter();
- return writer;
- } catch (IOException e) {
- throw new HoodieException("Unable to initialize HoodieLogFormat writer", e);
+ /**
+ * Get or create a writer config for parquet writer.
+ */
+ private HoodieWriteConfig getOrCreateWriterConfig() {
+ if (this.writerConfig == null) {
+ this.writerConfig = HoodieWriteConfig.newBuilder()
+ .withProperties(this.config.getProps())
+ .withPopulateMetaFields(false).build();
}
+ return this.writerConfig;
}
- private void close() {
+ private HoodieFileWriter openWriter(Path filePath) {
try {
- if (this.writer != null) {
- this.writer.close();
- }
+ return HoodieFileWriterFactory.getFileWriter("", filePath, metaClient.getHadoopConf(), getOrCreateWriterConfig(),
+ HoodieArchivedInstant.getClassSchema(), table.getTaskContextSupplier(), HoodieRecordType.AVRO);
} catch (IOException e) {
- throw new HoodieException("Unable to close HoodieLogFormat writer", e);
+ throw new HoodieException("Unable to initialize archiving writer", e);
}
}
@@ -238,207 +217,218 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc
// there is no owner or instant time per se for archival.
txnManager.beginTransaction(Option.empty(), Option.empty());
}
- List instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
- verifyLastMergeArchiveFilesIfNecessary(context);
+ // Sort again because the cleaning and rollback instants could break the sequence.
+ List instantsToArchive = getInstantsToArchive().sorted().collect(Collectors.toList());
boolean success = true;
if (!instantsToArchive.isEmpty()) {
- this.writer = openWriter();
LOG.info("Archiving instants " + instantsToArchive);
archive(context, instantsToArchive);
LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive, context);
+ // triggers compaction and cleaning only after archiving action
+ compactAndClean(context);
} else {
LOG.info("No Instants to archive");
}
-
- if (shouldMergeSmallArchiveFiles()) {
- mergeArchiveFilesIfNecessary(context);
- }
return success;
} finally {
- close();
if (acquireLock) {
txnManager.endTransaction(Option.empty());
}
}
}
- public boolean shouldMergeSmallArchiveFiles() {
- return config.getArchiveMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme());
- }
-
/**
- * Here Hoodie can merge the small archive files into a new larger one.
- * Only used for filesystem which does not support append operation.
- * The whole merge small archive files operation has four stages:
- * 1. Build merge plan with merge candidates/merged file name infos.
- * 2. Do merge.
- * 3. Delete all the candidates.
- * 4. Delete the merge plan.
+ * Compacts the small archive files.
+ *
+ *
The parquet naming convention is:
+ *
+ *
${min_instant}_${max_instant}_${level}.parquet
+ *
+ *
The 'min_instant' and 'max_instant' represent the instant time range of the parquet file.
+ * The 'level' represents the number of the level where the file is located, currently we
+ * have no limit for the number of layers.
+ *
+ *
These archive parquet files composite as an LSM tree layout, one parquet file contains
+ * a consecutive timestamp instant metadata entries. Different parquet files may have
+ * overlapping with the instant time ranges.
+ *
+ *
Compaction and cleaning: once the files number exceed a threshold(now constant 10) N,
+ * the oldest N files are then replaced with a compacted file in the next layer.
+ * A cleaning action is triggered right after the compaction.
*
* @param context HoodieEngineContext
- * @throws IOException
*/
- private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException {
- Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
- // Flush remained content if existed and open a new write
- reOpenWriter();
- // List all archive files
- FileStatus[] fsStatuses = metaClient.getFs().globStatus(
- new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
- // Sort files by version suffix in reverse (implies reverse chronological order)
- Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator());
+ @VisibleForTesting
+ public void compactAndClean(HoodieEngineContext context) throws IOException {
+ // 1. List all the latest snapshot files
+ List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient);
+ int layer = 0;
+ // 2. triggers the compaction for L0
+ Option compactedFileName = doCompact(latestSnapshotFiles, layer);
+ while (compactedFileName.isPresent()) {
+ // 3. once a compaction had been executed for the current layer,
+ // continues to trigger compaction for the next layer.
+ latestSnapshotFiles.add(compactedFileName.get());
+ compactedFileName = doCompact(latestSnapshotFiles, ++layer);
+ }
+
+ // cleaning
+ clean(context, layer + 1);
+ }
+
+ private Option doCompact(List latestSnapshotFiles, int layer) throws IOException {
+ // 1. list all the files that belong to current layer
+ List files = latestSnapshotFiles
+ .stream().filter(file -> HoodieArchivedTimeline.isFileFromLayer(file, layer)).collect(Collectors.toList());
int archiveMergeFilesBatchSize = config.getArchiveMergeFilesBatchSize();
- long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
-
- List mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses);
-
- if (mergeCandidate.size() >= archiveMergeFilesBatchSize) {
- List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList());
- // before merge archive files build merge plan
- String logFileName = computeLogFileName();
- buildArchiveMergePlan(candidateFiles, planPath, logFileName);
- // merge archive files
- mergeArchiveFiles(mergeCandidate);
- // after merge, delete the small archive files.
- deleteFilesParallelize(metaClient, candidateFiles, context, true);
- LOG.info("Success to delete replaced small archive files.");
- // finally, delete archiveMergePlan which means merging small archive files operation is successful.
- metaClient.getFs().delete(planPath, false);
- LOG.info("Success to merge small archive files.");
+
+ if (files.size() >= archiveMergeFilesBatchSize) {
+ // 2. sort files by min instant time (implies ascending chronological order)
+ files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
+ List candidateFiles = files.stream()
+ .limit(archiveMergeFilesBatchSize)
+ .collect(Collectors.toList());
+ String compactedFileName = compactedFileName(candidateFiles);
+
+ // 3. compaction
+ compactArchiveFiles(candidateFiles, compactedFileName);
+ // 4. update the manifest file
+ updateManifest(candidateFiles, compactedFileName);
+ LOG.info("Finishes compaction of archive files: " + candidateFiles);
+ return Option.of(compactedFileName);
}
+ return Option.empty();
}
/**
- * Find the latest 'huge archive file' index as a break point and only check/merge newer archive files.
- * Because we need to keep the original order of archive files which is important when loading archived instants with time filter.
- * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function commitsFilter)
- *
- * @param smallFileLimitBytes small File Limit Bytes
- * @param fsStatuses Sort by version suffix in reverse
- * @return merge candidates
+ * Returns a new file name.
*/
- private List getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) {
- int index = 0;
- for (; index < fsStatuses.length; index++) {
- if (fsStatuses[index].getLen() > smallFileLimitBytes) {
- break;
- }
- }
- return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
+ private static String newFileName(String minInstant, String maxInstant, int layer) {
+ return minInstant + "_" + maxInstant + "_" + layer + HoodieFileFormat.PARQUET.getFileExtension();
}
/**
- * Get final written archive file name based on storageSchemes which does not support append.
+ * Returns a new file name.
*/
- private String computeLogFileName() throws IOException {
- String logWriteToken = writer.getLogFile().getLogWriteToken();
- HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
- return hoodieLogFile.getFileName();
+ @VisibleForTesting
+ public static String compactedFileName(List files) {
+ String minInstant = files.stream().map(HoodieArchivedTimeline::getMinInstantTime)
+ .min(Comparator.naturalOrder()).get();
+ String maxInstant = files.stream().map(HoodieArchivedTimeline::getMaxInstantTime)
+ .max(Comparator.naturalOrder()).get();
+ int currentLayer = HoodieArchivedTimeline.getFileLayer(files.get(0));
+ return newFileName(minInstant, maxInstant, currentLayer + 1);
}
/**
- * Check/Solve if there is any failed and unfinished merge small archive files operation
+ * Checks whether there is any unfinished compaction operation.
*
* @param context HoodieEngineContext used for parallelize to delete small archive files if necessary.
- * @throws IOException
*/
- private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException {
- if (shouldMergeSmallArchiveFiles()) {
- Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
- HoodieWrapperFileSystem fs = metaClient.getFs();
- // If plan exist, last merge small archive files was failed.
- // we need to revert or complete last action.
- if (fs.exists(planPath)) {
- HoodieMergeArchiveFilePlan plan = null;
- try {
- plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fs, planPath).get(), HoodieMergeArchiveFilePlan.class);
- } catch (IOException e) {
- LOG.warn("Parsing merge archive plan failed.", e);
- // Reading partial plan file which means last merge action is failed during writing plan file.
- fs.delete(planPath);
- return;
+ private void clean(HoodieEngineContext context, int compactedVersions) throws IOException {
+ // if there are more than 3 version of snapshots, clean the oldest files.
+ List allSnapshotVersions = HoodieArchivedTimeline.allSnapshotVersions(metaClient);
+ int numVersionsToKeep = 3 + compactedVersions; // should make the threshold configurable.
+ if (allSnapshotVersions.size() > numVersionsToKeep) {
+ allSnapshotVersions.sort((v1, v2) -> v2 - v1);
+ List versionsToKeep = allSnapshotVersions.subList(0, numVersionsToKeep);
+ Set filesToKeep = versionsToKeep.stream()
+ .flatMap(version -> HoodieArchivedTimeline.latestSnapshotFiles(metaClient, version).stream())
+ .collect(Collectors.toSet());
+ // delete the manifest file first
+ List metaFilesToClean = new ArrayList<>();
+ Arrays.stream(HoodieArchivedTimeline.listAllVersionFiles(metaClient)).forEach(fileStatus -> {
+ if (!versionsToKeep.contains(HoodieArchivedTimeline.getSnapshotVersion(fileStatus.getPath().getName()))) {
+ metaFilesToClean.add(fileStatus.getPath().toString());
}
- Path mergedArchiveFile = new Path(metaClient.getArchivePath(), plan.getMergedArchiveFileName());
- List candidates = plan.getCandidate().stream().map(Path::new).collect(Collectors.toList());
- if (candidateAllExists(candidates)) {
- // Last merge action is failed during writing merged archive file.
- // But all the small archive files are not deleted.
- // Revert last action by deleting mergedArchiveFile if existed.
- if (fs.exists(mergedArchiveFile)) {
- fs.delete(mergedArchiveFile, false);
- }
- } else {
- // Last merge action is failed during deleting small archive files.
- // But the merged files is completed.
- // Try to complete last action
- if (fs.exists(mergedArchiveFile)) {
- deleteFilesParallelize(metaClient, plan.getCandidate(), context, true);
- }
+ });
+ Arrays.stream(HoodieArchivedTimeline.listAllManifestFiles(metaClient)).forEach(fileStatus -> {
+ if (!versionsToKeep.contains(HoodieArchivedTimeline.getManifestVersion(fileStatus.getPath().getName()))) {
+ metaFilesToClean.add(fileStatus.getPath().toString());
}
-
- fs.delete(planPath);
- }
+ });
+ deleteFilesParallelize(metaClient, metaFilesToClean, context, false);
+ // delete the archive data files
+ List dataFilesToClean = Arrays.stream(HoodieArchivedTimeline.listAllMetaFiles(metaClient))
+ .filter(fileStatus -> !filesToKeep.contains(fileStatus.getPath().getName()))
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+ deleteFilesParallelize(metaClient, dataFilesToClean, context, false);
}
}
- /**
- * If all the candidate small archive files existed, last merge operation was failed during writing the merged archive file.
- * If at least one of candidate small archive files existed, the merged archive file was created and last operation was failed during deleting the small archive files.
- */
- private boolean candidateAllExists(List candidates) throws IOException {
- for (Path archiveFile : candidates) {
- if (!metaClient.getFs().exists(archiveFile)) {
- // candidate is deleted
- return false;
- }
- }
- return true;
+ public void updateManifest(String fileToAdd) throws IOException {
+ // 1. read the latest manifest version file;
+ // 2. read the latest manifest file for valid files;
+ // 3. add this new file to the existing file list from step2.
+ int latestVersion = HoodieArchivedTimeline.latestSnapshotVersion(metaClient);
+ List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient, latestVersion);
+ List newFileList = new ArrayList<>(latestSnapshotFiles);
+ newFileList.add(fileToAdd);
+ createManifestFile(newFileList, latestVersion);
}
- public void buildArchiveMergePlan(List compactCandidate, Path planPath, String compactedArchiveFileName) throws IOException {
- LOG.info("Start to build archive merge plan.");
- HoodieMergeArchiveFilePlan plan = HoodieMergeArchiveFilePlan.newBuilder()
- .setCandidate(compactCandidate)
- .setMergedArchiveFileName(compactedArchiveFileName)
- .build();
- Option content = TimelineMetadataUtils.serializeAvroMetadata(plan, HoodieMergeArchiveFilePlan.class);
- // building merge archive files plan.
- FileIOUtils.createFileInPath(metaClient.getFs(), planPath, content);
- LOG.info("Success to build archive merge plan");
+ public void updateManifest(List filesToRemove, String fileToAdd) throws IOException {
+ // 1. read the latest manifest version file;
+ // 2. read the latest manifest file for valid files;
+ // 3. remove files to the existing file list from step2;
+ // 4. add this new file to the existing file list from step2.
+ int latestVersion = HoodieArchivedTimeline.latestSnapshotVersion(metaClient);
+ List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient, latestVersion);
+ List newFileList = new ArrayList<>(latestSnapshotFiles);
+ newFileList.removeAll(filesToRemove);
+ newFileList.add(fileToAdd);
+ createManifestFile(newFileList, latestVersion);
}
- public void mergeArchiveFiles(List compactCandidate) throws IOException {
+ private void createManifestFile(List newFileList, int currentVersion) {
+ byte[] content = String.join(",", newFileList).getBytes(StandardCharsets.UTF_8);
+ // version starts from 1 and increases monotonically
+ int newVersion = currentVersion < 0 ? 1 : currentVersion + 1;
+ // create manifest file
+ FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getManifestFilePath(metaClient, newVersion), Option.of(content));
+ // create version file
+ FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getVersionFilePath(metaClient, newVersion), Option.empty());
+ }
+
+ public void compactArchiveFiles(List candidateFiles, String compactedFileName) {
LOG.info("Starting to merge small archive files.");
- Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
- try {
- List records = new ArrayList<>();
- for (FileStatus fs : compactCandidate) {
+ try (HoodieFileWriter writer = openWriter(new Path(metaClient.getArchivePath(), compactedFileName))) {
+ for (String fileName : candidateFiles) {
// Read the archived file
- try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
- new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
- // Read the avro blocks
- while (reader.hasNext()) {
- HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> records.add((IndexedRecord) r.getData()));
- if (records.size() >= this.config.getCommitArchivalBatchSize()) {
- writeToFile(wrapperSchema, records);
+ try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ .getFileReader(metaClient.getHadoopConf(), new Path(metaClient.getArchivePath(), fileName))) {
+ // Read the meta entry
+ try (ClosableIterator iterator = reader.getIndexedRecordIterator(HoodieArchivedInstant.getClassSchema(), HoodieArchivedInstant.getClassSchema())) {
+ while (iterator.hasNext()) {
+ IndexedRecord record = iterator.next();
+ writer.write(record.get(0).toString(), new HoodieAvroIndexedRecord(record), HoodieArchivedInstant.getClassSchema());
}
}
}
}
- writeToFile(wrapperSchema, records);
} catch (Exception e) {
throw new HoodieCommitException("Failed to merge small archive files", e);
- } finally {
- writer.close();
}
LOG.info("Success to merge small archive files.");
}
- private Map deleteFilesParallelize(HoodieTableMetaClient metaClient, List paths, HoodieEngineContext context, boolean ignoreFailed) {
-
+ private Map deleteFilesParallelize(
+ HoodieTableMetaClient metaClient,
+ List paths,
+ HoodieEngineContext context,
+ boolean ignoreFailed) {
return FSUtils.parallelizeFilesProcess(context,
metaClient.getFs(),
config.getArchiveDeleteParallelism(),
@@ -471,7 +461,7 @@ private Stream getCleanInstantsToArchive() {
if (hoodieInstants.size() > this.maxInstantsToKeep) {
return hoodieInstants.subList(0, hoodieInstants.size() - this.minInstantsToKeep);
} else {
- return new ArrayList();
+ return Collections.emptyList();
}
}).flatMap(Collection::stream);
}
@@ -498,9 +488,7 @@ private Stream getCommitInstantsToArchive() throws IOException {
LESSER_THAN, oldestPendingInstant.get().getTimestamp())).findFirst());
// Check if the completed instant is higher than the oldest inflight instant
// in that case update the oldestCommitToRetain to oldestInflight commit time.
- if (!completedCommitBeforeOldestPendingInstant.isPresent()
- || HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
- LESSER_THAN, completedCommitBeforeOldestPendingInstant.get().getTimestamp())) {
+ if (!completedCommitBeforeOldestPendingInstant.isPresent()) {
oldestCommitToRetain = oldestPendingInstant;
} else {
oldestCommitToRetain = completedCommitBeforeOldestPendingInstant;
@@ -567,8 +555,7 @@ private Stream getCommitInstantsToArchive() throws IOException {
}
}
- private Stream getInstantsToArchive() throws IOException {
- Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+ private Stream getInstantsToArchive() throws IOException {
if (config.isMetaserverEnabled()) {
return Stream.empty();
}
@@ -576,8 +563,10 @@ private Stream getInstantsToArchive() throws IOException {
// For archiving and cleaning instants, we need to include intermediate state files if they exist
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
Map, List> groupByTsAction = rawActiveTimeline.getInstantsAsStream()
- .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
- HoodieInstant.getComparableAction(i.getAction()))));
+ .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
+ HoodieInstant.getComparableAction(i.getAction()))));
+
+ Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
// If metadata table is enabled, do not archive instants which are more recent than the last compaction on the
// metadata table.
@@ -626,30 +615,22 @@ private Stream getInstantsToArchive() throws IOException {
}
}
- return instants.flatMap(hoodieInstant -> {
+ return instants.map(hoodieInstant -> {
List instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction())));
- if (instantsToStream != null) {
- return instantsToStream.stream();
- } else {
- // if a concurrent writer archived the instant
- return Stream.empty();
- }
+ return ActiveInstant.fromInstants(instantsToStream);
});
}
- private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) throws IOException {
+ private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) {
LOG.info("Deleting instants " + archivedInstants);
List pendingInstants = new ArrayList<>();
List completedInstants = new ArrayList<>();
- for (HoodieInstant instant : archivedInstants) {
- if (instant.isCompleted()) {
- completedInstants.add(instant);
- } else {
- pendingInstants.add(instant);
- }
+ for (ActiveInstant triple : archivedInstants) {
+ completedInstants.add(triple.getCompleted());
+ pendingInstants.addAll(triple.getPendingInstants());
}
context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
@@ -678,52 +659,35 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo
return true;
}
- public void archive(HoodieEngineContext context, List instants) throws HoodieCommitException {
- try {
- Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
- LOG.info("Wrapper schema " + wrapperSchema.toString());
- List records = new ArrayList<>();
- for (HoodieInstant hoodieInstant : instants) {
+ public void archive(HoodieEngineContext context, List instants) throws HoodieCommitException {
+ Path filePath = new Path(metaClient.getArchivePath(),
+ newFileName(instants.get(0).getInstantTime(), instants.get(instants.size() - 1).getInstantTime(), HoodieArchivedTimeline.FILE_LAYER_ZERO));
+ try (HoodieFileWriter writer = openWriter(filePath)) {
+ Schema wrapperSchema = HoodieArchivedInstant.getClassSchema();
+ LOG.info("Archiving schema " + wrapperSchema.toString());
+ for (ActiveInstant triple : instants) {
try {
- deleteAnyLeftOverMarkers(context, hoodieInstant);
- records.add(convertToAvroRecord(hoodieInstant));
- if (records.size() >= this.config.getCommitArchivalBatchSize()) {
- writeToFile(wrapperSchema, records);
- }
+ deleteAnyLeftOverMarkers(context, triple);
+ // in local FS and HDFS, there could be empty completed instants due to crash.
+ final HoodieArchivedInstant metaEntry = MetadataConversionUtils.createArchivedInstant(triple, metaClient);
+ writer.write(metaEntry.getInstantTime(), new HoodieAvroIndexedRecord(metaEntry), wrapperSchema);
} catch (Exception e) {
- LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e);
+ LOG.error("Failed to archive instant: " + triple.getInstantTime(), e);
if (this.config.isFailOnTimelineArchivingEnabled()) {
throw e;
}
}
}
- writeToFile(wrapperSchema, records);
+ updateManifest(filePath.getName());
} catch (Exception e) {
throw new HoodieCommitException("Failed to archive commits", e);
}
}
- private void deleteAnyLeftOverMarkers(HoodieEngineContext context, HoodieInstant instant) {
- WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, instant.getTimestamp());
+ private void deleteAnyLeftOverMarkers(HoodieEngineContext context, ActiveInstant activeInstant) {
+ WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, activeInstant.getInstantTime());
if (writeMarkers.deleteMarkerDir(context, config.getMarkersDeleteParallelism())) {
- LOG.info("Cleaned up left over marker directory for instant :" + instant);
+ LOG.info("Cleaned up left over marker directory for instant :" + activeInstant.getCompleted());
}
}
-
- private void writeToFile(Schema wrapperSchema, List records) throws Exception {
- if (records.size() > 0) {
- Map header = new HashMap<>();
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
- final String keyField = table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
- List indexRecords = records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
- HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, header, keyField);
- writer.appendBlock(block);
- records.clear();
- }
- }
-
- private IndexedRecord convertToAvroRecord(HoodieInstant hoodieInstant)
- throws IOException {
- return MetadataConversionUtils.createMetaWrapper(hoodieInstant, metaClient);
- }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/LegacyArchivedMetaEntryReader.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/LegacyArchivedMetaEntryReader.java
new file mode 100644
index 0000000000000..eaad3d120b04c
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/LegacyArchivedMetaEntryReader.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.StreamSupport;
+
+/**
+ * Tools used for migrating to new LSM tree style archived timeline.
+ */
+public class LegacyArchivedMetaEntryReader {
+ private static final Logger LOG = LoggerFactory.getLogger(LegacyArchivedMetaEntryReader.class);
+
+ private static final Pattern ARCHIVE_FILE_PATTERN =
+ Pattern.compile("^\\.commits_\\.archive\\.([0-9]+).*");
+
+ public static final String MERGE_ARCHIVE_PLAN_NAME = "mergeArchivePlan";
+
+ private static final String ACTION_TYPE_KEY = "actionType";
+ private static final String ACTION_STATE = "actionState";
+ private static final String STATE_TRANSITION_TIME = "stateTransitionTime";
+
+ private final HoodieTableMetaClient metaClient;
+
+ private final Map readCommits;
+
+ public LegacyArchivedMetaEntryReader(HoodieTableMetaClient metaClient) {
+ this.metaClient = metaClient;
+ this.readCommits = new HashMap<>();
+ }
+
+ /**
+ * @deprecated Use {@code HoodieArchivedTimeline#readCommit} instead.
+ */
+ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
+ final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
+ final String action = record.get(ACTION_TYPE_KEY).toString();
+ final String stateTransitionTime = (String) record.get(STATE_TRANSITION_TIME);
+ if (loadDetails) {
+ getMetadataKey(action).map(key -> {
+ Object actionData = record.get(key);
+ if (actionData != null) {
+ if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
+ this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord) actionData));
+ } else {
+ this.readCommits.put(instantTime, actionData.toString().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ return null;
+ });
+ }
+ return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action,
+ instantTime, stateTransitionTime);
+ }
+
+ @Nonnull
+ private Option getMetadataKey(String action) {
+ switch (action) {
+ case HoodieTimeline.CLEAN_ACTION:
+ return Option.of("hoodieCleanMetadata");
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ return Option.of("hoodieCommitMetadata");
+ case HoodieTimeline.ROLLBACK_ACTION:
+ return Option.of("hoodieRollbackMetadata");
+ case HoodieTimeline.SAVEPOINT_ACTION:
+ return Option.of("hoodieSavePointMetadata");
+ case HoodieTimeline.COMPACTION_ACTION:
+ case HoodieTimeline.LOG_COMPACTION_ACTION:
+ return Option.of("hoodieCompactionPlan");
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ return Option.of("hoodieReplaceCommitMetadata");
+ case HoodieTimeline.INDEXING_ACTION:
+ return Option.of("hoodieIndexCommitMetadata");
+ default:
+ LOG.error(String.format("Unknown action in metadata (%s)", action));
+ return Option.empty();
+ }
+ }
+
+ /**
+ * This is method to read selected instants. Do NOT use this directly use one of the helper methods above
+ * If loadInstantDetails is set to true, this would also update 'readCommits' map with commit details
+ * If filter is specified, only the filtered instants are loaded
+ * If commitsFilter is specified, only the filtered records are loaded
+ *
+ * @deprecated Use {@code HoodieArchivedTimeline.loadInstantsV2} instead.
+ */
+ private List loadInstants(
+ HoodieArchivedTimeline.TimeRangeFilter filter,
+ boolean loadInstantDetails,
+ Function commitsFilter) {
+ try {
+ // List all files
+ FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+ new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+
+ // Sort files by version suffix in reverse (implies reverse chronological order)
+ Arrays.sort(fsStatuses, new ArchiveLogVersionComparator());
+
+ Set instantsInRange = new HashSet<>();
+ for (FileStatus fs : fsStatuses) {
+ // Read the archived file
+ try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
+ new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
+ int instantsInPreviousFile = instantsInRange.size();
+ // Read the avro blocks
+ while (reader.hasNext()) {
+ HoodieLogBlock block = reader.next();
+ if (block instanceof HoodieAvroDataBlock) {
+ HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
+ // TODO If we can store additional metadata in datablock, we can skip parsing records
+ // (such as startTime, endTime of records in the block)
+ try (ClosableIterator> itr = avroBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO)) {
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
+ // Filter blocks in desired time window
+ .map(r -> (GenericRecord) r.getData())
+ .filter(commitsFilter::apply)
+ .map(r -> readCommit(r, loadInstantDetails))
+ .filter(c -> filter == null || filter.isInRange(c.getTimestamp()))
+ .forEach(instantsInRange::add);
+ }
+ }
+ }
+
+ if (filter != null) {
+ int instantsInCurrentFile = instantsInRange.size() - instantsInPreviousFile;
+ if (instantsInPreviousFile > 0 && instantsInCurrentFile == 0) {
+ // Note that this is an optimization to skip reading unnecessary archived files
+ // This signals we crossed lower bound of desired time window.
+ break;
+ }
+ }
+ } catch (Exception oriException) {
+ // merge small archive files may leave uncompleted archive file which will cause exception.
+ // need to ignore this kind of exception here.
+ try {
+ if (isCorruptCompactionFile(metaClient, fs)) {
+ continue;
+ }
+ throw oriException;
+ } catch (Exception e) {
+ // If anything wrong during parsing merge archive plan, we need to throw the original exception.
+ // For example corrupted archive file and corrupted plan are both existed.
+ throw oriException;
+ }
+ }
+ }
+
+ ArrayList result = new ArrayList<>(instantsInRange);
+ Collections.sort(result);
+ return result;
+ } catch (IOException e) {
+ throw new HoodieIOException(
+ "Could not load archived commit timeline from path " + metaClient.getArchivePath(), e);
+ }
+ }
+
+ public static boolean isCorruptCompactionFile(HoodieTableMetaClient metaClient, FileStatus fileStatus) throws IOException {
+ Path planPath = new Path(metaClient.getArchivePath(), MERGE_ARCHIVE_PLAN_NAME);
+ HoodieWrapperFileSystem fileSystem = metaClient.getFs();
+ if (fileSystem.exists(planPath)) {
+ HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class);
+ String mergedArchiveFileName = plan.getMergedArchiveFileName();
+ if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && fileStatus.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) {
+ LOG.warn("Catch exception because of reading uncompleted merging archive file " + mergedArchiveFileName + ". Ignore it here.");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Sort files by reverse order of version suffix in file name.
+ */
+ public static class ArchiveLogVersionComparator implements Comparator, Serializable {
+ @Override
+ public int compare(FileStatus f1, FileStatus f2) {
+ return Integer.compare(getArchivedFileSuffix(f2), getArchivedFileSuffix(f1));
+ }
+ }
+
+ private static int getArchivedFileSuffix(FileStatus f) {
+ try {
+ Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(f.getPath().getName());
+ if (fileMatcher.matches()) {
+ return Integer.parseInt(fileMatcher.group(1));
+ }
+ } catch (NumberFormatException e) {
+ // log and ignore any format warnings
+ LOG.warn("error getting suffix for archived file: " + f.getPath());
+ }
+ // return default value in case of any errors
+ return 0;
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ActiveInstant.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ActiveInstant.java
new file mode 100644
index 0000000000000..52d235b92a48f
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ActiveInstant.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A combination of instants covering action states: requested, inflight, completed.
+ */
+public class ActiveInstant implements Serializable, Comparable {
+ private final HoodieInstant requested;
+ private final HoodieInstant inflight;
+ private final HoodieInstant completed;
+
+ /**
+ * The constructor.
+ */
+ protected ActiveInstant(@Nullable HoodieInstant requested, @Nullable HoodieInstant inflight, HoodieInstant completed) {
+ this.requested = requested;
+ this.inflight = inflight;
+ this.completed = completed;
+ }
+
+ public static ActiveInstant fromInstants(List instants) {
+ ValidationUtils.checkArgument(instants.size() <= 3);
+ HoodieInstant requested = null;
+ HoodieInstant inflight = null;
+ HoodieInstant completed = null;
+ for (HoodieInstant instant : instants) {
+ if (instant.isRequested()) {
+ requested = instant;
+ } else if (instant.isInflight()) {
+ inflight = instant;
+ } else {
+ completed = instant;
+ }
+ }
+ return new ActiveInstant(requested, inflight, Objects.requireNonNull(completed));
+ }
+
+ public List getPendingInstants() {
+ List instants = new ArrayList<>(2);
+ if (this.requested != null) {
+ instants.add(this.requested);
+ }
+ if (this.inflight != null) {
+ instants.add(this.inflight);
+ }
+ return instants;
+ }
+
+ public HoodieInstant getCompleted() {
+ return completed;
+ }
+
+ public String getAction() {
+ return this.completed.getAction();
+ }
+
+ /**
+ * A COMPACTION action eventually becomes COMMIT when completed.
+ */
+ public String getPendingAction() {
+ return getPendingInstant().getAction();
+ }
+
+ public String getInstantTime() {
+ return this.completed.getTimestamp();
+ }
+
+ public String getCompletionTime() {
+ return this.completed.getStateTransitionTime();
+ }
+
+ public Option getCommitMetadata(HoodieTableMetaClient metaClient) {
+ Option content = metaClient.getActiveTimeline().getInstantDetails(this.completed);
+ if (content.isPresent() && content.get().length == 0) {
+ return Option.empty();
+ }
+ return content;
+ }
+
+ public Option getRequestedCommitMetadata(HoodieTableMetaClient metaClient) {
+ if (this.requested != null) {
+ Option requestedContent = metaClient.getActiveTimeline().getInstantDetails(this.requested);
+ if (!requestedContent.isPresent() || requestedContent.get().length == 0) {
+ return Option.empty();
+ } else {
+ return requestedContent;
+ }
+ } else {
+ return Option.empty();
+ }
+ }
+
+ public Option getInflightCommitMetadata(HoodieTableMetaClient metaClient) {
+ if (this.inflight != null) {
+ Option inflightContent = metaClient.getActiveTimeline().getInstantDetails(this.inflight);
+ if (!inflightContent.isPresent() || inflightContent.get().length == 0) {
+ return Option.empty();
+ } else {
+ return inflightContent;
+ }
+ } else {
+ return Option.empty();
+ }
+ }
+
+ public byte[] getCleanPlan(HoodieTableMetaClient metaClient) {
+ return metaClient.getActiveTimeline().readCleanerInfoAsBytes(getPendingInstant()).get();
+ }
+
+ public byte[] getCompactionPlan(HoodieTableMetaClient metaClient) {
+ return metaClient.getActiveTimeline().readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(getInstantTime())).get();
+ }
+
+ public byte[] getLogCompactionPlan(HoodieTableMetaClient metaClient) {
+ return metaClient.getActiveTimeline().readCompactionPlanAsBytes(HoodieTimeline.getLogCompactionRequestedInstant(getInstantTime())).get();
+ }
+
+ private HoodieInstant getPendingInstant() {
+ if (requested != null) {
+ return requested;
+ } else if (inflight != null) {
+ return inflight;
+ } else {
+ throw new AssertionError("Pending instant does not exist.");
+ }
+ }
+
+ @Override
+ public int compareTo(ActiveInstant other) {
+ return this.completed.getTimestamp().compareTo(other.completed.getTimestamp());
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index cfd47ab2b374e..80bad68ba401a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -18,9 +18,7 @@
package org.apache.hudi.client.utils;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
+import org.apache.hudi.avro.model.HoodieArchivedInstant;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
@@ -41,6 +39,12 @@
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
/**
* Helper class to convert between different action related payloads and {@link HoodieArchivedMetaEntry}.
*/
@@ -137,6 +141,41 @@ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInst
return archivedMetaWrapper;
}
+ public static HoodieArchivedInstant createArchivedInstant(ActiveInstant activeInstant, HoodieTableMetaClient metaClient) {
+ HoodieArchivedInstant archivedInstant = new HoodieArchivedInstant();
+ archivedInstant.setInstantTime(activeInstant.getInstantTime());
+ archivedInstant.setCompletionTime(activeInstant.getCompletionTime());
+ archivedInstant.setAction(activeInstant.getAction());
+ activeInstant.getCommitMetadata(metaClient).ifPresent(commitMetadata -> archivedInstant.setMetadata(ByteBuffer.wrap(commitMetadata)));
+ switch (activeInstant.getPendingAction()) {
+ case HoodieTimeline.CLEAN_ACTION: {
+ archivedInstant.setPlan(ByteBuffer.wrap(activeInstant.getCleanPlan(metaClient)));
+ break;
+ }
+ case HoodieTimeline.REPLACE_COMMIT_ACTION: {
+ // we may have cases with empty HoodieRequestedReplaceMetadata e.g. insert_overwrite_table or insert_overwrite
+ // without clustering. However, we should revisit the requested commit file standardization
+ activeInstant.getRequestedCommitMetadata(metaClient).ifPresent(metadata -> archivedInstant.setPlan(ByteBuffer.wrap(metadata)));
+ // inflight replacecommit files have the same metadata body as HoodieCommitMetadata,
+ // so we could re-use it without further creating an inflight extension.
+ // Or inflight replacecommit files are empty under clustering circumstance.
+ activeInstant.getInflightCommitMetadata(metaClient).ifPresent(metadata -> archivedInstant.setPlan(ByteBuffer.wrap(metadata)));
+ break;
+ }
+ case HoodieTimeline.COMPACTION_ACTION: {
+ archivedInstant.setPlan(ByteBuffer.wrap(activeInstant.getCompactionPlan(metaClient)));
+ break;
+ }
+ case HoodieTimeline.LOG_COMPACTION_ACTION: {
+ archivedInstant.setPlan(ByteBuffer.wrap(activeInstant.getLogCompactionPlan(metaClient)));
+ break;
+ }
+ default:
+ // no operation
+ }
+ return archivedInstant;
+ }
+
public static HoodieArchivedMetaEntry createMetaWrapperForEmptyInstant(HoodieInstant hoodieInstant) {
HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
index 23eaa276c5538..c4876d3a0a610 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -94,19 +94,6 @@ public class HoodieArchivalConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("The number of small archive files to be merged at once.");
- public static final ConfigProperty ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
- .key("hoodie.archive.merge.small.file.limit.bytes")
- .defaultValue(20L * 1024 * 1024)
- .markAdvanced()
- .withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.");
-
- public static final ConfigProperty ARCHIVE_MERGE_ENABLE = ConfigProperty
- .key("hoodie.archive.merge.enable")
- .defaultValue(false)
- .markAdvanced()
- .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
- + " useful when storage scheme doesn't support append operation.");
-
public static final ConfigProperty ARCHIVE_BEYOND_SAVEPOINT = ConfigProperty
.key("hoodie.archive.beyond.savepoint")
.defaultValue(false)
@@ -191,16 +178,6 @@ public HoodieArchivalConfig.Builder withArchiveMergeFilesBatchSize(int number) {
return this;
}
- public HoodieArchivalConfig.Builder withArchiveMergeSmallFileLimit(long size) {
- archivalConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size));
- return this;
- }
-
- public HoodieArchivalConfig.Builder withArchiveMergeEnable(boolean enable) {
- archivalConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable));
- return this;
- }
-
public HoodieArchivalConfig.Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
archivalConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 1581e21c070be..f2f30fb3ec08d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1513,18 +1513,10 @@ public boolean isAutoClean() {
return getBoolean(HoodieCleanConfig.AUTO_CLEAN);
}
- public boolean getArchiveMergeEnable() {
- return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
- }
-
public boolean shouldArchiveBeyondSavepoint() {
return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT);
}
- public long getArchiveMergeSmallFileLimitBytes() {
- return getLong(HoodieArchivalConfig.ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES);
- }
-
public boolean isAutoArchive() {
return getBoolean(HoodieArchivalConfig.AUTO_ARCHIVE);
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyActiveInstant.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyActiveInstant.java
new file mode 100644
index 0000000000000..f8f5b5b2e80be
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyActiveInstant.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.client.utils.ActiveInstant;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+/**
+ * Instant triple for testing.
+ */
+public class DummyActiveInstant extends ActiveInstant {
+ private final byte[] commitMetadata;
+
+ /**
+ * Only for testing purpose.
+ */
+ public DummyActiveInstant(HoodieInstant completed, byte[] commitMetadata) {
+ super(new HoodieInstant(HoodieInstant.State.REQUESTED, completed.getAction(), completed.getTimestamp()),
+ new HoodieInstant(HoodieInstant.State.INFLIGHT, completed.getAction(), completed.getTimestamp()),
+ completed);
+ this.commitMetadata = commitMetadata;
+ }
+
+ @Override
+ public Option getCommitMetadata(HoodieTableMetaClient metaClient) {
+ return Option.of(this.commitMetadata);
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
index c11a29aa4f60c..2569ff434eb81 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
@@ -222,7 +222,7 @@ public static void createPendingCompaction(String instantTime, HoodieTableMetaCl
compactionPlan.setOperations(Arrays.asList(operation));
HoodieTestTable.of(metaClient)
.addRequestedCompaction(instantTime, compactionPlan);
- FileCreateUtils.createPendingInflightCompaction(metaClient.getBasePath(), instantTime);
+ FileCreateUtils.createInflightCompaction(metaClient.getBasePath(), instantTime);
}
public static void createCompleteCompaction(String instantTime, HoodieTableMetaClient metaClient) throws Exception {
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
index 3938df3f3afd5..c85078ccf7d08 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utils;
import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -30,7 +31,10 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.stream.Collectors;
+
import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieArchivedInstant;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -41,6 +45,7 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.avro.model.HoodieSliceInfo;
+import org.apache.hudi.client.utils.ActiveInstant;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -48,15 +53,19 @@
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -74,8 +83,8 @@ public void testCompletedClean() throws Exception {
createCleanMetadata(newCommitTime);
HoodieArchivedMetaEntry metaEntry = MetadataConversionUtils.createMetaWrapper(
new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, newCommitTime), metaClient);
- assertEquals(metaEntry.getActionState(), State.COMPLETED.toString());
- assertEquals(metaEntry.getHoodieCleanMetadata().getStartCleanTime(), newCommitTime);
+ assertEquals(State.COMPLETED.toString(), metaEntry.getActionState());
+ assertEquals(newCommitTime, metaEntry.getHoodieCleanMetadata().getStartCleanTime());
}
@Test
@@ -167,6 +176,124 @@ public void testConvertCommitMetadata() {
assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());
}
+ // -------------------------------------------------------------------------
+ // BEGIN: test cases for HoodieArchivedInstant conversion.
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testArchivedClean() throws Exception {
+ String newCommitTime = HoodieTestTable.makeNewCommitTime();
+ createCleanMetadata(newCommitTime);
+ // test conversion to archived instant
+ HoodieArchivedInstant archived = MetadataConversionUtils.createArchivedInstant(getActiveInstant(newCommitTime), metaClient);
+ assertEquals(newCommitTime, archived.getInstantTime());
+ assertEquals(HoodieTimeline.CLEAN_ACTION, archived.getAction());
+ assertDoesNotThrow(() -> CleanerUtils.getCleanerMetadata(metaClient, archived.getMetadata().array()));
+ assertDoesNotThrow(() -> TimelineMetadataUtils.deserializeCleanerPlan(archived.getPlan().array()));
+ }
+
+ @Test
+ public void testArchivedReplace() throws Exception {
+ String newCommitTime = HoodieTestTable.makeNewCommitTime();
+ createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE, true);
+ // test conversion to archived instant
+ HoodieArchivedInstant archived = MetadataConversionUtils.createArchivedInstant(getActiveInstant(newCommitTime), metaClient);
+ assertEquals(newCommitTime, archived.getInstantTime());
+ assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived.getAction());
+ assertDoesNotThrow(() -> HoodieReplaceCommitMetadata.fromBytes(archived.getMetadata().array(), HoodieReplaceCommitMetadata.class));
+ assertDoesNotThrow(() -> TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived.getPlan().array()));
+ }
+
+ @Test
+ public void testArchivedInsertOverwriteWithoutClustering() throws Exception {
+ String newCommitTime = HoodieTestTable.makeNewCommitTime();
+ createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE, false);
+ // test conversion to archived instant
+ HoodieArchivedInstant archived = MetadataConversionUtils.createArchivedInstant(getActiveInstant(newCommitTime), metaClient);
+ assertEquals(newCommitTime, archived.getInstantTime());
+ assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived.getAction());
+ assertDoesNotThrow(() -> HoodieReplaceCommitMetadata.fromBytes(archived.getMetadata().array(), HoodieReplaceCommitMetadata.class));
+ assertDoesNotThrow(() -> HoodieCommitMetadata.fromBytes(archived.getPlan().array(), HoodieCommitMetadata.class),
+ "Insert overwrite without clustering should have a plan");
+
+ String newCommitTime2 = HoodieTestTable.makeNewCommitTime();
+ createReplace(newCommitTime2, WriteOperationType.INSERT_OVERWRITE_TABLE, false);
+ // test conversion to archived instant
+ HoodieArchivedInstant archived2 = MetadataConversionUtils.createArchivedInstant(getActiveInstant(newCommitTime2), metaClient);
+ assertEquals(newCommitTime2, archived2.getInstantTime());
+ assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived2.getAction());
+ assertDoesNotThrow(() -> HoodieReplaceCommitMetadata.fromBytes(archived2.getMetadata().array(), HoodieReplaceCommitMetadata.class));
+ assertDoesNotThrow(() -> HoodieCommitMetadata.fromBytes(archived2.getPlan().array(), HoodieCommitMetadata.class),
+ "Insert overwrite table without clustering should have a plan");
+ }
+
+ @Test
+ public void testArchivedInsertOverwriteWithClustering() throws Exception {
+ String newCommitTime = HoodieTestTable.makeNewCommitTime();
+ createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE, true);
+ // test conversion to archived instant
+ HoodieArchivedInstant archived = MetadataConversionUtils.createArchivedInstant(getActiveInstant(newCommitTime), metaClient);
+ assertEquals(newCommitTime, archived.getInstantTime());
+ assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived.getAction());
+ assertDoesNotThrow(() -> HoodieReplaceCommitMetadata.fromBytes(archived.getMetadata().array(), HoodieReplaceCommitMetadata.class));
+ assertDoesNotThrow(() -> TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived.getPlan().array()));
+
+ String newCommitTime2 = HoodieTestTable.makeNewCommitTime();
+ createReplace(newCommitTime2, WriteOperationType.INSERT_OVERWRITE_TABLE, true);
+ // test conversion to archived instant
+ HoodieArchivedInstant archived2 = MetadataConversionUtils.createArchivedInstant(getActiveInstant(newCommitTime2), metaClient);
+ assertEquals(newCommitTime2, archived2.getInstantTime());
+ assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived2.getAction());
+ assertDoesNotThrow(() -> HoodieReplaceCommitMetadata.fromBytes(archived2.getMetadata().array(), HoodieReplaceCommitMetadata.class));
+ assertDoesNotThrow(() -> TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived2.getPlan().array()));
+ }
+
+ @Test
+ public void testArchivedCommit() throws Exception {
+ String newCommitTime = HoodieTestTable.makeNewCommitTime();
+ createCommitMetadata(newCommitTime);
+ HoodieArchivedInstant archived = MetadataConversionUtils.createArchivedInstant(getActiveInstant(newCommitTime), metaClient);
+ assertEquals(newCommitTime, archived.getInstantTime());
+ assertEquals(HoodieTimeline.COMMIT_ACTION, archived.getAction());
+ assertDoesNotThrow(() -> HoodieCommitMetadata.fromBytes(archived.getMetadata().array(), HoodieCommitMetadata.class));
+ }
+
+ @Test
+ public void testArchivedDeltaCommit() throws Exception {
+ String newCommitTime = HoodieTestTable.makeNewCommitTime();
+ createDeltaCommitMetadata(newCommitTime);
+ HoodieArchivedInstant archived = MetadataConversionUtils.createArchivedInstant(getActiveInstant(newCommitTime), metaClient);
+ assertEquals(newCommitTime, archived.getInstantTime());
+ assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, archived.getAction());
+ assertNull(archived.getMetadata());
+ }
+
+ @Test
+ public void testArchivedRollback() throws Exception {
+ String newCommitTime = HoodieTestTable.makeNewCommitTime();
+ createRollbackMetadata(newCommitTime);
+ HoodieArchivedInstant archived = MetadataConversionUtils.createArchivedInstant(getActiveInstant(newCommitTime), metaClient);
+ assertEquals(newCommitTime, archived.getInstantTime());
+ assertEquals(HoodieTimeline.ROLLBACK_ACTION, archived.getAction());
+ assertDoesNotThrow(() -> TimelineMetadataUtils.deserializeHoodieRollbackMetadata(archived.getMetadata().array()));
+ assertNull(archived.getPlan());
+ }
+
+ @Test
+ public void testArchivedCompaction() throws Exception {
+ String newCommitTime = HoodieTestTable.makeNewCommitTime();
+ createCompactionMetadata(newCommitTime);
+ HoodieArchivedInstant archived = MetadataConversionUtils.createArchivedInstant(getActiveInstant(newCommitTime), metaClient);
+ assertEquals(newCommitTime, archived.getInstantTime());
+ assertEquals(HoodieTimeline.COMMIT_ACTION, archived.getAction());
+ assertDoesNotThrow(() -> HoodieCommitMetadata.fromBytes(archived.getMetadata().array(), HoodieCommitMetadata.class));
+ assertDoesNotThrow(() -> CompactionUtils.getCompactionPlan(metaClient, Option.of(archived.getPlan().array())));
+ }
+
+ // -------------------------------------------------------------------------
+ // END: test cases for HoodieArchivedInstant conversion.
+ // -------------------------------------------------------------------------
+
private void createCompactionMetadata(String instantTime) throws Exception {
String fileId1 = "file-" + instantTime + "-1";
String fileId2 = "file-" + instantTime + "-2";
@@ -176,7 +303,7 @@ private void createCompactionMetadata(String instantTime) throws Exception {
commitMetadata.setOperationType(WriteOperationType.COMPACT);
commitMetadata.setCompacted(true);
HoodieTestTable.of(metaClient)
- .addCommit(instantTime, Option.of(commitMetadata))
+ .addCompaction(instantTime, commitMetadata)
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}
@@ -273,4 +400,10 @@ HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEF
HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata);
}
+
+ private ActiveInstant getActiveInstant(String instantTime) {
+ HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
+ List instants = rawActiveTimeline.getInstantsAsStream().filter(instant -> instant.getTimestamp().equals(instantTime)).collect(Collectors.toList());
+ return ActiveInstant.fromInstants(instants);
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index be979c892f321..001e56d97bc1c 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -524,14 +524,8 @@ public void testArchivalOnLogCompaction() throws Exception {
if (instants == null) {
continue;
}
- assertEquals(3, instants.size());
- for (HoodieInstant instant: instants) {
- if (instant.isCompleted()) {
- assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, instant.getAction());
- } else {
- assertEquals(HoodieTimeline.LOG_COMPACTION_ACTION, instant.getAction());
- }
- }
+ assertEquals(1, instants.size());
+ assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, instants.get(0).getAction());
logCompactionInstantArchived = true;
}
assertTrue(logCompactionInstantArchived);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 5b8f7cd20c71f..5f957c09912a3 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -33,12 +33,12 @@
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -62,7 +62,6 @@
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
@@ -105,9 +104,9 @@
import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
import static org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
@@ -135,7 +134,7 @@ public void init(HoodieTableType tableType) throws Exception {
hadoopConf.addResource(wrapperFs.getConf());
}
- private void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) throws IOException {
+ private void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) {
if (enableMetadataTable) {
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, writeConfig, context);
// reload because table configs could have been updated
@@ -162,7 +161,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits,
- maxDeltaCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200,
+ maxDeltaCommits, maxDeltaCommitsMetadataTable, tableType, 10,
HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
}
@@ -172,7 +171,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits,
- 5, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200,
+ 5, maxDeltaCommitsMetadataTable, tableType, 10,
HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
}
@@ -180,11 +179,9 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int minArchivalCommits,
int maxArchivalCommits,
int maxDeltaCommitsMetadataTable,
- boolean enableArchiveMerge,
- int archiveFilesBatch,
- long size) throws Exception {
+ int archiveFilesBatch) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, 5,
- maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, enableArchiveMerge, archiveFilesBatch, size,
+ maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, archiveFilesBatch,
HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
}
@@ -194,9 +191,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int maxDeltaCommits,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType,
- boolean enableArchiveMerge,
int archiveFilesBatch,
- long size,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
WriteConcurrencyMode writeConcurrencyMode) throws Exception {
return initTestTableAndGetWriteConfig(
@@ -206,9 +201,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
maxDeltaCommits,
maxDeltaCommitsMetadataTable,
tableType,
- enableArchiveMerge,
archiveFilesBatch,
- size,
failedWritesCleaningPolicy,
writeConcurrencyMode,
ARCHIVE_BEYOND_SAVEPOINT.defaultValue());
@@ -220,9 +213,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int maxDeltaCommits,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType,
- boolean enableArchiveMerge,
int archiveFilesBatch,
- long size,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
WriteConcurrencyMode writeConcurrencyMode,
boolean archiveProceedBeyondSavepoints) throws Exception {
@@ -231,9 +222,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).withFailedWritesCleaningPolicy(failedWritesCleaningPolicy).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
- .withArchiveMergeEnable(enableArchiveMerge)
.withArchiveMergeFilesBatchSize(archiveFilesBatch)
- .withArchiveMergeSmallFileLimit(size)
.archiveCommitsWith(minArchivalCommits, maxArchivalCommits)
.withArchiveBeyondSavepoint(archiveProceedBeyondSavepoints).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -454,7 +443,7 @@ protected static HoodieCommitMetadata generateCommitMetadata(
public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception {
boolean enableMetadata = false;
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2, HoodieTableType.COPY_ON_WRITE,
- false, 10, 209715200, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint);
+ 10, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint);
// min archival commits is 2 and max archival commits is 4. and so, after 5th commit, 3 commits will be archived.
for (int i = 1; i < 5; i++) {
@@ -502,10 +491,9 @@ public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exc
getActiveCommitInstants(Arrays.asList("00000008", "00000009")), commitsAfterArchival);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableArchiveMerge) throws Exception {
- HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, enableArchiveMerge, 3, 209715200);
+ @Test
+ public void testCompactionWithCorruptManifestFile() throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 10; i++) {
@@ -513,27 +501,23 @@ public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableA
archiveAndGetCommitsList(writeConfig);
}
- // build a merge small archive plan with dummy content
+ // build a compaction archive plan with dummy content
// this plan can not be deserialized.
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
- FileStatus[] fsStatuses = metaClient.getFs().globStatus(
- new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
- List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
-
- archiver.reOpenWriter();
- Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
- archiver.buildArchiveMergePlan(candidateFiles, plan, ".commits_.archive.3_1-0-1");
- String s = "Dummy Content";
- // stain the current merge plan file.
- FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes()));
-
- // check that damaged plan file will not block archived timeline loading.
+ List files = HoodieArchivedTimeline.latestSnapshotFiles(metaClient);
+ files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
+
+ archiver.updateManifest("dummy.parquet");
+ // remove the version file created by the last manifest update
+ metaClient.getFs().delete(HoodieArchivedTimeline.getVersionFilePath(metaClient, HoodieArchivedTimeline.latestSnapshotVersion(metaClient)));
+
+ // check that invalid manifest file will not block archived timeline loading.
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
- assertEquals(9 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
+ assertEquals(5 * 3 + 4, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
- // trigger several archive after left damaged merge small archive file plan.
+ // trigger several archive with the invalid manifest file.
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
@@ -544,19 +528,18 @@ public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableA
HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();
// check instant number
- assertEquals(18 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
+ assertEquals(4 * 3 + 14, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
- // if there are damaged archive files and damaged plan, hoodie need throw ioe while loading archived timeline.
- Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
- FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
+ // if there are damaged archive files and damaged plan, hoodie can still load correctly.
+ Path damagedFile = new Path(metaClient.getArchivePath(), "300_301_1.parquet");
+ FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of("dummy".getBytes()));
- assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
+ assertDoesNotThrow(() -> metaClient.getArchivedTimeline().reload(), "Archived timeline can skip the invalid data and manifest files smartly");
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testMergeSmallArchiveFilesRecoverFromMergeFailed(boolean enableArchiveMerge) throws Exception {
- HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, enableArchiveMerge, 3, 209715200);
+ @Test
+ public void testCompactionRecoverWithoutManifestFile() throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 10; i++) {
@@ -567,121 +550,61 @@ public void testMergeSmallArchiveFilesRecoverFromMergeFailed(boolean enableArchi
// do a single merge small archive files
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
- FileStatus[] fsStatuses = metaClient.getFs().globStatus(
- new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
- List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
- archiver.reOpenWriter();
+ List candidateFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient);
+ candidateFiles.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
- archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
- archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
- HoodieLogFormat.Writer writer = archiver.reOpenWriter();
+ String compactedFileName = HoodieTimelineArchiver.compactedFileName(candidateFiles);
+ archiver.compactArchiveFiles(candidateFiles, compactedFileName);
// check loading archived and active timeline success
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
- assertEquals(9 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.reload().countInstants());
-
- String s = "Dummy Content";
- // stain the current merged archive file.
- FileIOUtils.createFileInPath(metaClient.getFs(), writer.getLogFile().getPath(), Option.of(s.getBytes()));
-
- // do another archive actions with merge small archive files.
- for (int i = 1; i < 10; i++) {
- testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
- archiveAndGetCommitsList(writeConfig);
- }
-
- // check result.
- // we need to load archived timeline successfully and ignore the parsing damage merged archive files exception.
- HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
- HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();
-
- assertEquals(18 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
-
- // if there are a damaged merged archive files and other common damaged archive file.
- // hoodie need throw ioe while loading archived timeline because of parsing the damaged archive file.
- Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
- FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
-
- assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
+ assertEquals(5 * 3 + 4, rawActiveTimeline.countInstants() + archivedTimeLine.reload().countInstants());
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testMergeSmallArchiveFilesRecoverFromDeleteFailed(boolean enableArchiveMerge) throws Exception {
- HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, enableArchiveMerge, 3, 209715200);
+ @Test
+ public void testCompactionCleaning() throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
// do ingestion and trigger archive actions here.
- for (int i = 1; i < 10; i++) {
+ for (int i = 1; i < 19; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}
-
- // do a single merge small archive files
- HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
- HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
- FileStatus[] fsStatuses = metaClient.getFs().globStatus(
- new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
- List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
-
- archiver.reOpenWriter();
-
- archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
- archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
- archiver.reOpenWriter();
-
- // delete only one of the small archive file to simulate delete action failed.
- metaClient.getFs().delete(fsStatuses[0].getPath());
+ // now we have version 6, 7, 8, 9 version of snapshots
// loading archived timeline and active timeline success
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
- HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
- assertEquals(9 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
-
- // do another archive actions with merge small archive files.
- for (int i = 1; i < 10; i++) {
- testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
- archiveAndGetCommitsList(writeConfig);
- }
+ HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline();
+ assertEquals(4 * 3 + 14, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
- // check result.
- HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
- HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();
-
- assertEquals(18 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
+ assertEquals(9, HoodieArchivedTimeline.latestSnapshotVersion(metaClient));
+ assertEquals(Arrays.asList(6, 7, 8, 9), HoodieArchivedTimeline.allSnapshotVersions(metaClient).stream().sorted().collect(Collectors.toList()));
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLoadArchiveTimelineWithDamagedPlanFile(boolean enableArchiveMerge) throws Exception {
- HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, enableArchiveMerge, 3, 209715200);
+ @Test
+ public void testReadArchivedCompactionPlan() throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 5, HoodieTableType.MERGE_ON_READ);
// do ingestion and trigger archive actions here.
- int numInstant = 12;
- for (int i = 1; i < 12; i++) {
- testTable.doWriteOperation(
- "000000" + String.format("%02d", i),
- WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(),
- Arrays.asList("p1", "p2"),
- 2);
+ for (int i = 1; i < 11; i += 2) {
+ testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+ testTable.doCompaction(String.format("%08d", (i + 1)), Arrays.asList("p1", "p2"));
archiveAndGetCommitsList(writeConfig);
}
- Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
- String s = "Dummy Content";
- // stain the current merge plan file.
- FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes()));
-
- // check that damaged plan file will not block archived timeline loading.
- HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
- HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
- assertEquals((numInstant - 1) * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
-
- // if there are damaged archive files and damaged plan, hoodie need throw ioe while loading archived timeline.
- Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
- FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
-
- assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
+ // loading archived timeline instants
+ HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline();
+ // load instant details
+ archivedTimeLine.loadCompactionDetailsInMemory("00000001", "00000011");
+ List compactionInstants = archivedTimeLine.getCommitTimeline().getInstants();
+ assertEquals(2, compactionInstants.size(), "Two compactions instants should be archived.");
+ List
+ * Keeps only 3 valid snapshot versions for the reader, that means, a file is kept for at lest 3 archival trigger interval, for default configuration, it is 30 instants time span,
+ * which is far longer that the archived timeline loading time.
+ *
+ *
Instants TTL
+ * The timeline reader only reads instants of last 7 days. We will skip the instants from archived timeline that are generated 1 week ago.
+ *
+ *
Timeline Refresh
+ *
Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call
+ * #reload().
+ *
+ *
This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized.
*/
public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
- public static final String MERGE_ARCHIVE_PLAN_NAME = "mergeArchivePlan";
+ private static final String VERSION_FILE_PREFIX = "_version_"; // _version_[N]
+ private static final String MANIFEST_FILE_PREFIX = "manifest_"; // manifest_[N]
+ public static final int FILE_LAYER_ZERO = 0;
private static final Pattern ARCHIVE_FILE_PATTERN =
- Pattern.compile("^\\.commits_\\.archive\\.([0-9]+).*");
-
- private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = "commits";
- private static final String ACTION_TYPE_KEY = "actionType";
- private static final String ACTION_STATE = "actionState";
- private static final String STATE_TRANSITION_TIME = "stateTransitionTime";
+ Pattern.compile("^(\\d+)_(\\d+)_(\\d)\\.parquet");
+ private static final String INSTANT_TIME_ARCHIVED_META_FIELD = "instantTime";
+ private static final String COMPLETION_TIME_ARCHIVED_META_FIELD = "completionTime";
+ private static final String ACTION_ARCHIVED_META_FIELD = "action";
+ private static final String METADATA_ARCHIVED_META_FIELD = "metadata";
+ private static final String PLAN_ARCHIVED_META_FIELD = "plan";
private HoodieTableMetaClient metaClient;
- private final Map readCommits = new HashMap<>();
+ private final Map readCommits = new ConcurrentHashMap<>();
private static final Logger LOG = LoggerFactory.getLogger(HoodieArchivedTimeline.class);
@@ -97,7 +147,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
*/
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
- setInstants(this.loadInstants(false));
+ setInstants(this.loadInstants());
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function> & Serializable) this::getInstantDetails;
@@ -109,8 +159,7 @@ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
*/
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String startTs) {
this.metaClient = metaClient;
- setInstants(loadInstants(new StartTsFilter(startTs), true,
- record -> HoodieInstant.State.COMPLETED.toString().equals(record.get(ACTION_STATE).toString())));
+ setInstants(loadInstants(new StartTsFilter(startTs), LoadMode.COMMIT_META));
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function> & Serializable) this::getInstantDetails;
@@ -133,21 +182,12 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN
in.defaultReadObject();
}
- public static Path getArchiveLogPath(String archiveFolder) {
- return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX);
- }
-
public void loadInstantDetailsInMemory(String startTs, String endTs) {
loadInstants(startTs, endTs);
}
public void loadCompletedInstantDetailsInMemory() {
- loadInstants(null, true,
- record -> {
- // Very old archived instants don't have action state set.
- Object action = record.get(ACTION_STATE);
- return action == null || HoodieInstant.State.COMPLETED.toString().equals(action.toString());
- });
+ loadInstants(null, LoadMode.COMMIT_META);
}
public void loadCompactionDetailsInMemory(String compactionInstantTime) {
@@ -156,13 +196,9 @@ public void loadCompactionDetailsInMemory(String compactionInstantTime) {
public void loadCompactionDetailsInMemory(String startTs, String endTs) {
// load compactionPlan
- loadInstants(new TimeRangeFilter(startTs, endTs), true,
- record -> {
- // Older files don't have action state set.
- Object action = record.get(ACTION_STATE);
- return record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION)
- && (action == null || HoodieInstant.State.INFLIGHT.toString().equals(action.toString()));
- }
+ loadInstants(new TimeRangeFilter(startTs, endTs), LoadMode.PLAN,
+ record -> record.get(ACTION_ARCHIVED_META_FIELD).toString().equals(HoodieTimeline.COMMIT_ACTION)
+ && record.get(PLAN_ARCHIVED_META_FIELD) != null
);
}
@@ -184,146 +220,306 @@ public HoodieArchivedTimeline reload() {
return new HoodieArchivedTimeline(metaClient);
}
- private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
- final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
- final String action = record.get(ACTION_TYPE_KEY).toString();
- final String stateTransitionTime = (String) record.get(STATE_TRANSITION_TIME);
- if (loadDetails) {
- getMetadataKey(action).map(key -> {
- Object actionData = record.get(key);
- if (actionData != null) {
- if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
- this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord) actionData));
- } else {
- this.readCommits.put(instantTime, actionData.toString().getBytes(StandardCharsets.UTF_8));
- }
+ private HoodieInstant readCommit(GenericRecord record, LoadMode loadMode) {
+ final String instantTime = record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
+ final String action = record.get(ACTION_ARCHIVED_META_FIELD).toString();
+ final String completionTime = record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
+ loadInstantDetails(record, instantTime, loadMode);
+ return new HoodieInstant(HoodieInstant.State.COMPLETED, action, instantTime, completionTime);
+ }
+
+ private void loadInstantDetails(GenericRecord record, String instantTime, LoadMode loadMode) {
+ switch (loadMode) {
+ case COMMIT_META:
+ ByteBuffer commitMeta = (ByteBuffer) record.get(METADATA_ARCHIVED_META_FIELD);
+ if (commitMeta != null) {
+ // in case the entry comes from an empty completed meta file
+ this.readCommits.put(instantTime, commitMeta.array());
}
- return null;
- });
- }
- return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action,
- instantTime, stateTransitionTime);
- }
-
- @Nonnull
- private Option getMetadataKey(String action) {
- switch (action) {
- case HoodieTimeline.CLEAN_ACTION:
- return Option.of("hoodieCleanMetadata");
- case HoodieTimeline.COMMIT_ACTION:
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- return Option.of("hoodieCommitMetadata");
- case HoodieTimeline.ROLLBACK_ACTION:
- return Option.of("hoodieRollbackMetadata");
- case HoodieTimeline.SAVEPOINT_ACTION:
- return Option.of("hoodieSavePointMetadata");
- case HoodieTimeline.COMPACTION_ACTION:
- case HoodieTimeline.LOG_COMPACTION_ACTION:
- return Option.of("hoodieCompactionPlan");
- case HoodieTimeline.REPLACE_COMMIT_ACTION:
- return Option.of("hoodieReplaceCommitMetadata");
- case HoodieTimeline.INDEXING_ACTION:
- return Option.of("hoodieIndexCommitMetadata");
+ break;
+ case PLAN:
+ ByteBuffer plan = (ByteBuffer) record.get(PLAN_ARCHIVED_META_FIELD);
+ if (plan != null) {
+ // in case the entry comes from an empty completed meta file
+ this.readCommits.put(instantTime, plan.array());
+ }
+ break;
default:
- LOG.error(String.format("Unknown action in metadata (%s)", action));
- return Option.empty();
+ // no operation
}
}
- private List loadInstants(boolean loadInstantDetails) {
- return loadInstants(null, loadInstantDetails);
+ private List loadInstants() {
+ return loadInstants(null, LoadMode.SLIM);
}
private List loadInstants(String startTs, String endTs) {
- return loadInstants(new TimeRangeFilter(startTs, endTs), true);
+ return loadInstants(new TimeRangeFilter(startTs, endTs), LoadMode.COMMIT_META);
}
- private List loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) {
- return loadInstants(filter, loadInstantDetails, record -> true);
+ private List loadInstants(TimeRangeFilter filter, LoadMode loadMode) {
+ return loadInstants(filter, loadMode, r -> true);
}
/**
* This is method to read selected instants. Do NOT use this directly use one of the helper methods above
* If loadInstantDetails is set to true, this would also update 'readCommits' map with commit details
* If filter is specified, only the filtered instants are loaded
- * If commitsFilter is specified, only the filtered records are loaded
+ * If commitsFilter is specified, only the filtered records are loaded.
*/
- private List loadInstants(TimeRangeFilter filter, boolean loadInstantDetails,
- Function commitsFilter) {
+ private List loadInstants(
+ @Nullable TimeRangeFilter filter,
+ LoadMode loadMode,
+ Function commitsFilter) {
try {
// List all files
- FileStatus[] fsStatuses = metaClient.getFs().globStatus(
- new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
-
- // Sort files by version suffix in reverse (implies reverse chronological order)
- Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
-
- Set instantsInRange = new HashSet<>();
- for (FileStatus fs : fsStatuses) {
- // Read the archived file
- try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
- new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
- int instantsInPreviousFile = instantsInRange.size();
- // Read the avro blocks
- while (reader.hasNext()) {
- HoodieLogBlock block = reader.next();
- if (block instanceof HoodieAvroDataBlock) {
- HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
- // TODO If we can store additional metadata in datablock, we can skip parsing records
- // (such as startTime, endTime of records in the block)
- try (ClosableIterator> itr = avroBlock.getRecordIterator(HoodieRecordType.AVRO)) {
- StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
- // Filter blocks in desired time window
- .map(r -> (GenericRecord) r.getData())
- .filter(commitsFilter::apply)
- .map(r -> readCommit(r, loadInstantDetails))
- .filter(c -> filter == null || filter.isInRange(c))
- .forEach(instantsInRange::add);
- }
- }
- }
-
- if (filter != null) {
- int instantsInCurrentFile = instantsInRange.size() - instantsInPreviousFile;
- if (instantsInPreviousFile > 0 && instantsInCurrentFile == 0) {
- // Note that this is an optimization to skip reading unnecessary archived files
- // This signals we crossed lower bound of desired time window.
- break;
- }
- }
- } catch (Exception originalException) {
- // merge small archive files may left uncompleted archive file which will cause exception.
- // need to ignore this kind of exception here.
- try {
- Path planPath = new Path(metaClient.getArchivePath(), MERGE_ARCHIVE_PLAN_NAME);
- HoodieWrapperFileSystem fileSystem = metaClient.getFs();
- if (fileSystem.exists(planPath)) {
- HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class);
- String mergedArchiveFileName = plan.getMergedArchiveFileName();
- if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && fs.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) {
- LOG.warn("Catch exception because of reading uncompleted merging archive file " + mergedArchiveFileName + ". Ignore it here.");
- continue;
- }
- }
- throw originalException;
- } catch (Exception e) {
- // If anything wrong during parsing merge archive plan, we need to throw the original exception.
- // For example corrupted archive file and corrupted plan are both existed.
- throw originalException;
- }
- }
- }
-
- ArrayList result = new ArrayList<>(instantsInRange);
+ List fileNames = latestSnapshotFiles(metaClient);
+
+ Map instantsInRange = new ConcurrentHashMap<>();
+ Schema readSchema = getReadSchema(loadMode);
+ fileNames.stream()
+ .filter(fileName -> filter == null || isFileInRange(filter, fileName))
+ .parallel().forEach(fileName -> {
+ // Read the archived file
+ try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ .getFileReader(metaClient.getHadoopConf(), new Path(metaClient.getArchivePath(), fileName))) {
+ try (ClosableIterator iterator = reader.getIndexedRecordIterator(HoodieArchivedInstant.getClassSchema(), readSchema)) {
+ while (iterator.hasNext()) {
+ GenericRecord record = (GenericRecord) iterator.next();
+ String instantTime = record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
+ if (!instantsInRange.containsKey(instantTime)
+ && (filter == null || filter.isInRange(instantTime))
+ && commitsFilter.apply(record)) {
+ HoodieInstant instant = readCommit(record, loadMode);
+ instantsInRange.put(instantTime, instant);
+ }
+ }
+ }
+ } catch (IOException ioException) {
+ throw new HoodieIOException("Error open file reader for path: " + new Path(metaClient.getArchivePath(), fileName));
+ }
+ });
+
+ ArrayList result = new ArrayList<>(instantsInRange.values());
Collections.sort(result);
return result;
} catch (IOException e) {
throw new HoodieIOException(
- "Could not load archived commit timeline from path " + metaClient.getArchivePath(), e);
+ "Could not load archived commit timeline from path " + metaClient.getArchivePath(), e);
+ }
+ }
+
+ @Override
+ public HoodieDefaultTimeline getWriteTimeline() {
+ // filter in-memory instants
+ Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
+ return new HoodieDefaultTimeline(getInstantsAsStream().filter(i ->
+ readCommits.containsKey(i.getTimestamp()))
+ .filter(s -> validActions.contains(s.getAction())), details);
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+ private static Schema getReadSchema(LoadMode loadMode) {
+ switch (loadMode) {
+ case SLIM:
+ return ArchivedInstantReadSchemas.SLIM_SCHEMA;
+ case COMMIT_META:
+ return ArchivedInstantReadSchemas.SCHEMA_WITH_COMMIT_META;
+ case PLAN:
+ return ArchivedInstantReadSchemas.SCHEMA_WITH_PLAN;
+ default:
+ throw new AssertionError("Unexpected");
+ }
+ }
+
+ /**
+ * Returns whether the given file is located in the filter.
+ */
+ private static boolean isFileInRange(TimeRangeFilter filter, String fileName) {
+ String minInstant = getMinInstantTime(fileName);
+ String maxInstant = getMaxInstantTime(fileName);
+ return filter.isInRange(minInstant) || filter.isInRange(maxInstant);
+ }
+
+ /**
+ * Returns the latest snapshot version.
+ */
+ public static int latestSnapshotVersion(HoodieTableMetaClient metaClient) throws IOException {
+ return allSnapshotVersions(metaClient).stream().max(Integer::compareTo).orElse(-1);
+ }
+
+ /**
+ * Returns all the valid snapshot versions.
+ */
+ public static List allSnapshotVersions(HoodieTableMetaClient metaClient) throws IOException {
+ return Arrays.stream(metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getVersionFilePathFilter()))
+ .map(fileStatus -> fileStatus.getPath().getName())
+ .map(HoodieArchivedTimeline::getSnapshotVersion)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Returns the latest snapshot metadata files.
+ */
+ public static List latestSnapshotFiles(HoodieTableMetaClient metaClient) throws IOException {
+ int latestVersion = latestSnapshotVersion(metaClient);
+ return latestSnapshotFiles(metaClient, latestVersion);
+ }
+
+ /**
+ * Reads the file list from the manifest file for the latest snapshot.
+ */
+ public static List latestSnapshotFiles(HoodieTableMetaClient metaClient, int latestVersion) {
+ if (latestVersion < 0) {
+ // there is no valid snapshot of the timeline.
+ return Collections.emptyList();
+ }
+ // read and deserialize the valid files.
+ byte[] content = FileIOUtils.readDataFromPath(metaClient.getFs(), getManifestFilePath(metaClient, latestVersion)).get();
+ return Arrays.stream(new String(content).split(",")).collect(Collectors.toList());
+ }
+
+ /**
+ * Returns the full manifest file path with given version number.
+ */
+ public static Path getManifestFilePath(HoodieTableMetaClient metaClient, int snapshotVersion) {
+ return new Path(metaClient.getArchivePath(), MANIFEST_FILE_PREFIX + snapshotVersion);
+ }
+
+ /**
+ * Returns the full version file path with given version number.
+ */
+ public static Path getVersionFilePath(HoodieTableMetaClient metaClient, int snapshotVersion) {
+ return new Path(metaClient.getArchivePath(), VERSION_FILE_PREFIX + snapshotVersion);
+ }
+
+ /**
+ * List all the version files.
+ */
+ public static FileStatus[] listAllVersionFiles(HoodieTableMetaClient metaClient) throws IOException {
+ return metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getVersionFilePathFilter());
+ }
+
+ /**
+ * List all the parquet manifest files.
+ */
+ public static FileStatus[] listAllManifestFiles(HoodieTableMetaClient metaClient) throws IOException {
+ return metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getManifestFilePathFilter());
+ }
+
+ /**
+ * List all the parquet metadata files.
+ */
+ public static FileStatus[] listAllMetaFiles(HoodieTableMetaClient metaClient) throws IOException {
+ return metaClient.getFs().globStatus(
+ new Path(metaClient.getArchivePath() + "/*.parquet"));
+ }
+
+ /**
+ * Parse the snapshot version from the version file name.
+ */
+ public static int getSnapshotVersion(String fileName) {
+ return Integer.parseInt(fileName.split("_")[2]);
+ }
+
+ /**
+ * Parse the snapshot version from the manifest file name.
+ */
+ public static int getManifestVersion(String fileName) {
+ return Integer.parseInt(fileName.split("_")[1]);
+ }
+
+ /**
+ * Parse the layer number from the file name.
+ */
+ public static int getFileLayer(String fileName) {
+ try {
+ Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName);
+ if (fileMatcher.matches()) {
+ return Integer.parseInt(fileMatcher.group(3));
+ }
+ } catch (NumberFormatException e) {
+ // log and ignore any format warnings
+ LOG.warn("error getting file layout for archived file: " + fileName);
+ }
+
+ // return default value in case of any errors
+ return FILE_LAYER_ZERO;
+ }
+
+ /**
+ * Parse the minimum instant time from the file name.
+ */
+ public static String getMinInstantTime(String fileName) {
+ Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName);
+ if (fileMatcher.matches()) {
+ return fileMatcher.group(1);
+ } else {
+ throw new HoodieException("Unexpected archival file name: " + fileName);
+ }
+ }
+
+ /**
+ * Parse the maximum instant time from the file name.
+ */
+ public static String getMaxInstantTime(String fileName) {
+ Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName);
+ if (fileMatcher.matches()) {
+ return fileMatcher.group(2);
+ } else {
+ throw new HoodieException("Unexpected archival file name: " + fileName);
}
}
- private static class TimeRangeFilter {
+ /**
+ * Returns whether a file belongs to the specified layer {@code layer} within the LSM layout.
+ */
+ public static boolean isFileFromLayer(String fileName, int layer) {
+ return getFileLayer(fileName) == layer;
+ }
+
+ /**
+ * Returns a path filter for the version pointer files.
+ */
+ public static PathFilter getVersionFilePathFilter() {
+ return path -> path.getName().startsWith(VERSION_FILE_PREFIX);
+ }
+
+ /**
+ * Returns a path filter for the manifest files.
+ */
+ public static PathFilter getManifestFilePathFilter() {
+ return path -> path.getName().startsWith(MANIFEST_FILE_PREFIX);
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ /**
+ * Different mode for loading the archived instant metadata.
+ */
+ private enum LoadMode {
+ /**
+ * Loads the instantTime, completionTime, action.
+ */
+ SLIM,
+ /**
+ * Loads the instantTime, completionTime, action, metadata.
+ */
+ COMMIT_META,
+ /**
+ * Loads the instantTime, completionTime, plan.
+ */
+ PLAN
+ }
+
+ /**
+ * A time based filter with range (startTs, endTs].
+ */
+ public static class TimeRangeFilter {
private final String startTs;
private final String endTs;
@@ -332,11 +528,14 @@ public TimeRangeFilter(String startTs, String endTs) {
this.endTs = endTs;
}
- public boolean isInRange(HoodieInstant instant) {
- return HoodieTimeline.isInRange(instant.getTimestamp(), this.startTs, this.endTs);
+ public boolean isInRange(String instantTime) {
+ return HoodieTimeline.isInRange(instantTime, this.startTs, this.endTs);
}
}
+ /**
+ * A time based filter with range [startTs, +∞).
+ */
private static class StartTsFilter extends TimeRangeFilter {
private final String startTs;
@@ -345,42 +544,18 @@ public StartTsFilter(String startTs) {
this.startTs = startTs;
}
- public boolean isInRange(HoodieInstant instant) {
- return HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN_OR_EQUALS, startTs);
+ public boolean isInRange(String instantTime) {
+ return HoodieTimeline.compareTimestamps(instantTime, GREATER_THAN_OR_EQUALS, startTs);
}
}
/**
- * Sort files by reverse order of version suffix in file name.
+ * Sort files by order of min instant time in file name.
*/
- public static class ArchiveFileVersionComparator implements Comparator, Serializable {
+ public static class ArchiveParquetVersionComparator implements Comparator, Serializable {
@Override
- public int compare(FileStatus f1, FileStatus f2) {
- return Integer.compare(getArchivedFileSuffix(f2), getArchivedFileSuffix(f1));
+ public int compare(String f1, String f2) {
+ return f1.compareTo(f2);
}
-
- private int getArchivedFileSuffix(FileStatus f) {
- try {
- Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(f.getPath().getName());
- if (fileMatcher.matches()) {
- return Integer.parseInt(fileMatcher.group(1));
- }
- } catch (NumberFormatException e) {
- // log and ignore any format warnings
- LOG.warn("error getting suffix for archived file: " + f.getPath());
- }
-
- // return default value in case of any errors
- return 0;
- }
- }
-
- @Override
- public HoodieDefaultTimeline getWriteTimeline() {
- // filter in-memory instants
- Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
- return new HoodieDefaultTimeline(getInstantsAsStream().filter(i ->
- readCommits.containsKey(i.getTimestamp()))
- .filter(s -> validActions.contains(s.getAction())), details);
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index 901530b11d6ed..787740f6c15ee 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -49,7 +49,7 @@ public class HoodieInstant implements Serializable, Comparable {
/**
* A COMPACTION action eventually becomes COMMIT when completed. So, when grouping instants
- * for state transitions, this needs to be taken into account
+ * for state transitions, this needs to be taken into account.
*/
private static final Map COMPARABLE_ACTIONS = createComparableActionsMap();
@@ -240,7 +240,7 @@ public String getFileName() {
throw new IllegalArgumentException("Cannot get file name for unknown action " + action);
}
- private static final Map createComparableActionsMap() {
+ private static Map createComparableActionsMap() {
Map comparableMap = new HashMap<>();
comparableMap.put(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.COMMIT_ACTION);
comparableMap.put(HoodieTimeline.LOG_COMPACTION_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedInstantReadSchemas.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedInstantReadSchemas.java
new file mode 100644
index 0000000000000..b198c6cb1a650
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedInstantReadSchemas.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.avro.Schema;
+
+/**
+ * Avro schema for different archived instant read cases.
+ */
+public abstract class ArchivedInstantReadSchemas {
+ public static final Schema SLIM_SCHEMA = new Schema.Parser().parse("{\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"HoodieArchivedMetaEntryV2\",\n"
+ + " \"namespace\":\"org.apache.hudi.avro.model\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"instantTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"completionTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"action\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " }\n"
+ + " ]\n"
+ + "}");
+
+ public static final Schema SCHEMA_WITH_COMMIT_META = new Schema.Parser().parse("{\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"HoodieArchivedMetaEntryV2\",\n"
+ + " \"namespace\":\"org.apache.hudi.avro.model\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"instantTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"completionTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"action\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"metadata\",\n"
+ + " \"type\":[\"null\", \"bytes\"],\n"
+ + " \"default\": null\n"
+ + " }\n"
+ + " ]\n"
+ + "}");
+
+ public static final Schema SCHEMA_WITH_PLAN = new Schema.Parser().parse("{\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"HoodieArchivedMetaEntryV2\",\n"
+ + " \"namespace\":\"org.apache.hudi.avro.model\",\n"
+ + " \"fields\":[\n"
+ + " {\n"
+ + " \"name\":\"instantTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"completionTime\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"action\",\n"
+ + " \"type\":[\"null\",\"string\"],\n"
+ + " \"default\": null\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\":\"plan\",\n"
+ + " \"type\":[\"null\", \"bytes\"],\n"
+ + " \"default\": null\n"
+ + " }\n"
+ + " ]\n"
+ + "}");
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
index af65bac055c30..b15ce11fd530e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
@@ -18,13 +18,14 @@
package org.apache.hudi.io.storage;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
import java.io.IOException;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
@@ -44,5 +45,5 @@ protected ClosableIterator getIndexedRecordIterator(Schema reader
return getIndexedRecordIterator(readerSchema, readerSchema);
}
- protected abstract ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
+ public abstract ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
index 3d6533a342919..26a4001039ec8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
@@ -192,7 +192,7 @@ public Set filterRowKeys(Set candidateRowKeys) {
}
@Override
- protected ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) {
+ public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) {
if (!Objects.equals(readerSchema, requestedSchema)) {
throw new UnsupportedOperationException("Schema projections are not supported in HFile reader");
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
index 1420424a58b01..00ba9fc3bb065 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
@@ -72,7 +72,7 @@ public Set filterRowKeys(Set candidateRowKeys) {
}
@Override
- protected ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) {
+ public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) {
if (!Objects.equals(readerSchema, requestedSchema)) {
throw new UnsupportedOperationException("Schema projections are not supported in HFile reader");
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index ad4d1f16a60ce..3dd070fa0a7f3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -95,7 +95,7 @@ protected ClosableIterator getIndexedRecordIterator(Schema schema
}
@Override
- protected ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+ public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
return getIndexedRecordIteratorInternal(readerSchema, Option.of(requestedSchema));
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
index d0f2ef025102f..4370d7b9c6722 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
@@ -26,7 +26,7 @@
import java.io.IOException;
import java.util.Properties;
-public interface HoodieFileWriter {
+public interface HoodieFileWriter extends AutoCloseable {
boolean canWrite();
void writeWithMetadata(HoodieKey key, HoodieRecord record, Schema schema, Properties props) throws IOException;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 469b5c8bf393a..9cf590bfbca65 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -283,29 +283,16 @@ public static void createRestoreFile(String basePath, String instantTime, Hoodie
createMetaFile(basePath, instantTime, HoodieTimeline.RESTORE_ACTION, serializeRestoreMetadata(hoodieRestoreMetadata).get());
}
- private static void createAuxiliaryMetaFile(String basePath, String instantTime, String suffix) throws IOException {
- Path parentPath = Paths.get(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
- Files.createDirectories(parentPath);
- Path metaFilePath = parentPath.resolve(instantTime + suffix);
- if (Files.notExists(metaFilePath)) {
- Files.createFile(metaFilePath);
- }
- }
-
public static void createRequestedCompaction(String basePath, String instantTime) throws IOException {
- createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION);
+ createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION);
}
public static void createInflightCompaction(String basePath, String instantTime) throws IOException {
- createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION);
- }
-
- public static void createPendingInflightCompaction(String basePath, String instantTime) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION);
}
public static void createInflightSavepoint(String basePath, String instantTime) throws IOException {
- createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION);
+ createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION);
}
public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 49095683a2bb5..16170ddfc9963 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -110,7 +110,6 @@
import static org.apache.hudi.common.testutils.FileCreateUtils.createReplaceCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCleanFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit;
-import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCompaction;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedReplaceCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedRollbackFile;
@@ -230,6 +229,15 @@ public HoodieTestTable addSavepointCommit(String instantTime, HoodieSavepointMet
return this;
}
+ public HoodieCommitMetadata createCommitMetadata(String commitTime, WriteOperationType operationType,
+ List partitions, int filesPerPartition, boolean bootstrap) {
+ Map>> partitionToFilesNameLengthMap = getPartitionFiles(partitions,
+ filesPerPartition);
+ HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(operationType,
+ metaClient.getTableType(), commitTime, partitionToFilesNameLengthMap);
+ return createCommitMetadata(operationType, commitTime, testTableState, bootstrap);
+ }
+
public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime,
HoodieTestTableState testTableState) {
String actionType = getCommitActionType(operationType, metaClient.getTableType());
@@ -459,7 +467,15 @@ public HoodieSavepointMetadata getSavepointMetadata(String instant, Map fileSlices = new ArrayList<>();
+ fileSlices.add(new FileSlice("par1", instantTime, "fg-1"));
+ fileSlices.add(new FileSlice("par2", instantTime, "fg-2"));
+ HoodieCompactionPlan compactionPlan = CompactionUtils
+ .buildFromFileSlices(fileSlices.stream().map(fs -> Pair.of(fs.getPartitionPath(), fs))
+ .collect(Collectors.toList()), Option.empty(), Option.empty());
+ HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
+ metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant,
+ TimelineMetadataUtils.serializeCompactionPlan(compactionPlan));
currentInstantTime = instantTime;
return this;
}
@@ -468,7 +484,8 @@ public HoodieTestTable addRequestedCompaction(String instantTime, HoodieCompacti
HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(compactionPlan));
- return addRequestedCompaction(instantTime);
+ currentInstantTime = instantTime;
+ return this;
}
public HoodieTestTable addRequestedCompaction(String instantTime, FileSlice... fileSlices) throws IOException {
@@ -493,9 +510,10 @@ public HoodieTestTable addInflightCompaction(String instantTime, HoodieCommitMet
}
public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception {
- createRequestedCompaction(basePath, instantTime);
- createInflightCompaction(basePath, instantTime);
- return addCommit(instantTime, Option.of(commitMetadata));
+ addInflightCompaction(instantTime, commitMetadata);
+ this.inflightCommits.remove(instantTime);
+ createCommit(basePath, instantTime, Option.of(commitMetadata));
+ return this;
}
public HoodieTestTable addDeletePartitionCommit(String instantTime, String partition, List fileIds) throws Exception {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala
new file mode 100644
index 0000000000000..6ac7cc057627d
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.DummyActiveInstant
+import org.apache.hudi.client.HoodieTimelineArchiver
+import org.apache.hudi.client.common.HoodieJavaEngineContext
+import org.apache.hudi.client.utils.ActiveInstant
+import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieTableType, WriteOperationType}
+import org.apache.hudi.common.table.timeline.{HoodieArchivedTimeline, HoodieInstant}
+import org.apache.hudi.common.testutils.{HoodieTestTable, HoodieTestUtils}
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.index.HoodieIndex.IndexType
+import org.apache.hudi.table.HoodieJavaTable
+import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
+
+import java.util
+import scala.collection.JavaConverters._
+
+object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase {
+
+ /**
+ * Java HotSpot(TM) 64-Bit Server VM 1.8.0_351-b10 on Mac OS X 13.4.1
+ * Apple M2
+ * pref load archived instants: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+ * ------------------------------------------------------------------------------------------------------------------------
+ * read shim instants 18 32 15 0.1 17914.8 1.0X
+ * read instants with commit metadata 19 25 5 0.1 19403.1 0.9X
+ */
+ private def readArchivedInstantsBenchmark(): Unit = {
+ withTempDir(f => {
+ val tableName = "testTable"
+ val tablePath = new Path(f.getCanonicalPath, tableName).toUri.toString
+ val metaClient = HoodieTestUtils.init(new Configuration(), tablePath, HoodieTableType.COPY_ON_WRITE, tableName)
+
+ val writeConfig = HoodieWriteConfig.newBuilder().withPath(tablePath)
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
+ .withMarkersType("DIRECT")
+ .build()
+ val engineContext = new HoodieJavaEngineContext(new Configuration())
+ val archiver = new HoodieTimelineArchiver(writeConfig, HoodieJavaTable.create(writeConfig, engineContext).asInstanceOf[HoodieJavaTable[HoodieAvroPayload]])
+
+ val startTs = System.currentTimeMillis()
+ val startInstant = startTs + 1 + ""
+ val commitsNum = 10000000
+ val batchSize = 2000
+ val instantBuffer = new util.ArrayList[ActiveInstant]()
+ for (i <- 1 to commitsNum) {
+ val instantTime = startTs + i + ""
+ val action = if (i % 2 == 0) "delta_commit" else "commit"
+ val instant = new HoodieInstant(HoodieInstant.State.COMPLETED, action, instantTime, instantTime + 1000)
+ val metadata = HoodieTestTable.of(metaClient).createCommitMetadata(instantTime, WriteOperationType.INSERT, util.Arrays.asList("par1", "par2"), 10, false).toJsonString.getBytes()
+ instantBuffer.add(new DummyActiveInstant(instant, metadata))
+ if (i % batchSize == 0) {
+ // archive 10 instants each time
+ archiver.archive(engineContext, instantBuffer)
+ archiver.compactAndClean(engineContext)
+ instantBuffer.clear()
+ }
+ }
+
+ val benchmark = new HoodieBenchmark("pref load archived instants", commitsNum, 3)
+ benchmark.addCase("read shim instants") { _ =>
+ new HoodieArchivedTimeline(metaClient)
+ }
+ benchmark.addCase("read instants with commit metadata") { _ =>
+ new HoodieArchivedTimeline(metaClient, startInstant)
+ }
+ benchmark.run()
+ val totalSize = HoodieArchivedTimeline.latestSnapshotFiles(metaClient).asScala
+ .map(name => metaClient.getFs.getFileStatus(new Path(metaClient.getArchivePath, name)).getLen)
+ .sum
+ println("Total file size in bytes: " + totalSize)
+ })
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ readArchivedInstantsBenchmark()
+ }
+}