Skip to content

Commit

Permalink
Add snapshot isolation for Archived timeline
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Jul 19, 2023
1 parent bec09f4 commit e88d7dc
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 323 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class InstantTriple implements Serializable, Comparable<InstantTriple> {
/**
* The constructor.
*/
private InstantTriple(@Nullable HoodieInstant requested, @Nullable HoodieInstant inflight, HoodieInstant completed) {
protected InstantTriple(@Nullable HoodieInstant requested, @Nullable HoodieInstant inflight, HoodieInstant completed) {
this.requested = requested;
this.inflight = inflight;
this.completed = completed;
Expand Down Expand Up @@ -91,8 +91,12 @@ public String getCompletionTime() {
return this.completed.getStateTransitionTime();
}

public byte[] getCommitMetadata(HoodieTableMetaClient metaClient) {
return metaClient.getActiveTimeline().getInstantDetails(this.completed).get();
public Option<byte[]> getCommitMetadata(HoodieTableMetaClient metaClient) {
Option<byte[]> content = metaClient.getActiveTimeline().getInstantDetails(this.completed);
if (content.isPresent() && content.get().length == 0) {
return Option.empty();
}
return content;
}

public Option<byte[]> getRequestedCommitMetadata(HoodieTableMetaClient metaClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,12 @@ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInst
return archivedMetaWrapper;
}

public static HoodieArchivedMetaEntryV2 createArchivedMetaEntry(InstantTriple triple, HoodieTableMetaClient metaClient, boolean validCommit) {
public static HoodieArchivedMetaEntryV2 createArchivedMetaEntry(InstantTriple triple, HoodieTableMetaClient metaClient) {
HoodieArchivedMetaEntryV2 metaEntryV2 = new HoodieArchivedMetaEntryV2();
metaEntryV2.setInstantTime(triple.getInstantTime());
metaEntryV2.setCompletionTime(triple.getCompletionTime());
metaEntryV2.setAction(triple.getAction());
if (validCommit) {
metaEntryV2.setMetadata(ByteBuffer.wrap(triple.getCommitMetadata(metaClient)));
}
triple.getCommitMetadata(metaClient).ifPresent(commitMetadata -> metaEntryV2.setMetadata(ByteBuffer.wrap(commitMetadata)));
switch (triple.getAction()) {
case HoodieTimeline.CLEAN_ACTION: {
metaEntryV2.setPlan(ByteBuffer.wrap(triple.getCleanPlan(metaClient)));
Expand All @@ -152,10 +150,7 @@ public static HoodieArchivedMetaEntryV2 createArchivedMetaEntry(InstantTriple tr
case HoodieTimeline.REPLACE_COMMIT_ACTION: {
// we may have cases with empty HoodieRequestedReplaceMetadata e.g. insert_overwrite_table or insert_overwrite
// without clustering. However, we should revisit the requested commit file standardization
Option<byte[]> requestedReplaceMetadata = triple.getRequestedCommitMetadata(metaClient);
if (requestedReplaceMetadata.isPresent()) {
metaEntryV2.setPlan(ByteBuffer.wrap(requestedReplaceMetadata.get()));
}
triple.getRequestedCommitMetadata(metaClient).ifPresent(metadata -> metaEntryV2.setPlan(ByteBuffer.wrap(metadata)));
break;
}
case HoodieTimeline.COMPACTION_ACTION: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.hudi;

import org.apache.hudi.client.utils.InstantTriple;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;

/**
* Instant triple for testing.
*/
public class DummyInstantTriple extends InstantTriple {
private final byte[] commitMetadata;

/**
* Only for testing purpose.
*/
public DummyInstantTriple(HoodieInstant completed, byte[] commitMetadata) {
super(new HoodieInstant(HoodieInstant.State.REQUESTED, completed.getAction(), completed.getTimestamp()),
new HoodieInstant(HoodieInstant.State.INFLIGHT, completed.getAction(), completed.getTimestamp()),
completed);
this.commitMetadata = commitMetadata;
}

@Override
public Option<byte[]> getCommitMetadata(HoodieTableMetaClient metaClient) {
return Option.of(this.commitMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.hudi.testutils.HoodieClientTestHarness;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -104,9 +103,9 @@
import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
import static org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
Expand Down Expand Up @@ -492,7 +491,7 @@ public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exc
}

@Test
public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed() throws Exception {
public void testCompactionWithCorruptManifestFile() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);

// do ingestion and trigger archive actions here.
Expand All @@ -505,23 +504,19 @@ public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed() throws Except
// this plan can not be deserialized.
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/*.parquet"));
Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());

Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
archiver.buildArchiveMergePlan(candidateFiles, plan, ".commits_.archive.3_1-0-1");
String s = "Dummy Content";
// stain the current merge plan file.
FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes()));

// check that damaged plan file will not block archived timeline loading.
List<String> files = HoodieArchivedTimeline.latestSnapshotFiles(metaClient);
files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator());

archiver.updateManifest("dummy.parquet");
// remove the version file created by the last manifest update
metaClient.getFs().delete(HoodieArchivedTimeline.getVersionFilePath(metaClient, HoodieArchivedTimeline.latestSnapshotVersion(metaClient)));

// check that invalid manifest file will not block archived timeline loading.
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
assertEquals(5 * 3 + 4, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());

// trigger several archive after left damaged merge small archive file plan.
// trigger several archive with the invalid manifest file.
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
Expand All @@ -534,15 +529,15 @@ public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed() throws Except
// check instant number
assertEquals(4 * 3 + 14, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());

// if there are damaged archive files and damaged plan, hoodie need throw ioe while loading archived timeline.
// if there are damaged archive files and damaged plan, hoodie can still load correctly.
Path damagedFile = new Path(metaClient.getArchivePath(), "300_301_1.parquet");
FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of("dummy".getBytes()));

assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
assertDoesNotThrow(() -> metaClient.getArchivedTimeline().reload(), "Archived timeline can skip the invalid data and manifest files smartly");
}

@Test
public void testMergeSmallArchiveFilesRecoverFromMergeFailed() throws Exception {
public void testCompactionRecoverWithoutManifestFile() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);

// do ingestion and trigger archive actions here.
Expand All @@ -554,122 +549,36 @@ public void testMergeSmallArchiveFilesRecoverFromMergeFailed() throws Exception
// do a single merge small archive files
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().listStatus(
new Path(metaClient.getArchivePath()), HoodieArchivedTimeline.getLayerZeroPathFilter());
Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
List<FileStatus> candidateFiles = Arrays.stream(fsStatuses).collect(Collectors.toList());
List<String> candidateFilePaths = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
List<String> candidateFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient);
candidateFiles.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator());

String compactedFileName = HoodieTimelineArchiver.compactedFileName(candidateFiles);
archiver.buildArchiveMergePlan(candidateFilePaths, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), compactedFileName);
archiver.compactArchiveFiles(candidateFiles, compactedFileName);

// check loading archived and active timeline success
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
assertEquals(5 * 3 + 4, rawActiveTimeline.countInstants() + archivedTimeLine.reload().countInstants());

String s = "Dummy Content";
// stain the current merged archive file.
Path compactedFilePath = new Path(metaClient.getArchivePath(), compactedFileName);
metaClient.getFs().delete(compactedFilePath);
FileIOUtils.createFileInPath(metaClient.getFs(), compactedFilePath, Option.of(s.getBytes()));

// do another archive actions with compaction triggered.
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}

// check result.
// we need to load archived timeline successfully and ignore the parsing damage merged archive files exception.
HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();

assertEquals(14 + 4 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());

// if there is any corrupt compaction archive file and other regular corrupt archive file.
// hoodie need throw ioe while loading archived timeline because of parsing the corrupt archive file.
Path corruptFile = new Path(metaClient.getArchivePath(), "300_301_1.parquet");
FileIOUtils.createFileInPath(metaClient.getFs(), corruptFile, Option.of(s.getBytes()));

assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
}

@Test
public void testMergeSmallArchiveFilesRecoverFromDeleteFailed() throws Exception {
public void testCompactionCleaning() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);

// do ingestion and trigger archive actions here.
for (int i = 1; i < 10; i++) {
for (int i = 1; i < 19; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}

// do a single merge small archive files
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().listStatus(
new Path(metaClient.getArchivePath()), HoodieArchivedTimeline.getLayerZeroPathFilter());
Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
List<FileStatus> candidateFiles = Arrays.stream(fsStatuses).collect(Collectors.toList());
List<String> candidateFilePaths = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());

String compactedFileName = HoodieTimelineArchiver.compactedFileName(candidateFiles);
archiver.buildArchiveMergePlan(candidateFilePaths, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), compactedFileName);
archiver.compactArchiveFiles(candidateFiles, compactedFileName);

// delete only one of the small archive file to simulate delete action failed.
metaClient.getFs().delete(fsStatuses[0].getPath());
// now we have version 6, 7, 8, 9 version of snapshots

// loading archived timeline and active timeline success
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
assertEquals(5 * 3 + 4, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());

// do another archive actions with merge small archive files.
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}

// check result.
HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();

assertEquals(14 + 4 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
}

@Test
public void testLoadArchiveTimelineWithDamagedPlanFile() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);

// do ingestion and trigger archive actions here.
int numInstant = 12;
for (int i = 1; i < 12; i++) {
testTable.doWriteOperation(
"000000" + String.format("%02d", i),
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"),
2);
archiveAndGetCommitsList(writeConfig);
}

Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
String s = "Dummy Content";
// stain the current merge plan file.
FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes()));

// check that damaged plan file will not block archived timeline loading.
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
assertEquals(5 * 3 + 6, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());

// if there are corrupt archive files and corrupt plan, hoodie needs throw ioe while loading the archived timeline.
Path corruptFile = new Path(metaClient.getArchivePath(), "300_301_1.parquet");
FileIOUtils.createFileInPath(metaClient.getFs(), corruptFile, Option.of(s.getBytes()));
assertEquals(4 * 3 + 14, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());

assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
assertEquals(9, HoodieArchivedTimeline.latestSnapshotVersion(metaClient));
assertEquals(Arrays.asList(6, 7, 8, 9), HoodieArchivedTimeline.allSnapshotVersions(metaClient).stream().sorted().collect(Collectors.toList()));
}

@Test
Expand Down Expand Up @@ -753,46 +662,6 @@ public static CompletableFuture allOfTerminateOnFailure(List<CompletableFuture<B
return CompletableFuture.anyOf(failure, CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])));
}

@Test
public void testLoadArchiveTimelineWithUncompletedMergeArchiveFile() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}

HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().listStatus(
new Path(metaClient.getArchivePath()), HoodieArchivedTimeline.getLayerZeroPathFilter());
Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveParquetVersionComparator());
List<FileStatus> candidateFiles = Arrays.stream(fsStatuses).collect(Collectors.toList());
List<String> candidateFilePaths = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
String compactedFileName = HoodieTimelineArchiver.compactedFileName(candidateFiles);

archiver.buildArchiveMergePlan(candidateFilePaths, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), compactedFileName);
archiver.compactArchiveFiles(candidateFiles, compactedFileName);

String s = "Dummy Content";
// stain the current merged archive file.
Path compactedFilePath = new Path(metaClient.getArchivePath(), compactedFileName);
metaClient.getFs().delete(compactedFilePath);
FileIOUtils.createFileInPath(metaClient.getFs(), compactedFilePath, Option.of(s.getBytes()));

// if there's only one corrupt compaction archive file, ignores the exception while reading this corrupt file.
HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline();

assertEquals(4 + 5 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());

// if there are a corrupt compaction archive files and other regular corrupt archive file,
// hoodie needs to throw ioe while loading archived timeline because of parsing the corrupt archive file.
Path damagedFile = new Path(metaClient.getArchivePath(), "300_301_1.parquet");
FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));

assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testNoArchivalUntilMaxArchiveConfigWithExtraInflightCommits(boolean enableMetadata) throws Exception {
Expand Down
Loading

0 comments on commit e88d7dc

Please sign in to comment.