Skip to content

Commit

Permalink
Merge branch 'apache:master' into throw_exception_if_recordkey_field_…
Browse files Browse the repository at this point in the history
…does_not_exist
  • Loading branch information
jonvex committed Sep 19, 2024
2 parents 695c2eb + f6a2ef4 commit 23c4f46
Show file tree
Hide file tree
Showing 131 changed files with 3,586 additions and 831 deletions.
5 changes: 5 additions & 0 deletions hudi-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.aws.sync.util.GluePartitionFilterGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -147,29 +148,12 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
private final int changedPartitionsReadParallelism;
private final int changeParallelism;

public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
super(config);
try {
GlueAsyncClientBuilder awsGlueBuilder = GlueAsyncClient.builder()
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(config.getProps()));
awsGlueBuilder = config.getString(AWS_GLUE_ENDPOINT) == null ? awsGlueBuilder :
awsGlueBuilder.endpointOverride(new URI(config.getString(AWS_GLUE_ENDPOINT)));
awsGlueBuilder = config.getString(AWS_GLUE_REGION) == null ? awsGlueBuilder :
awsGlueBuilder.region(Region.of(config.getString(AWS_GLUE_REGION)));
this.awsGlue = awsGlueBuilder.build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
this.skipTableArchive = config.getBooleanOrDefault(GlueCatalogSyncClientConfig.GLUE_SKIP_TABLE_ARCHIVE);
this.enableMetadataTable = Boolean.toString(config.getBoolean(GLUE_METADATA_FILE_LISTING)).toUpperCase();
this.allPartitionsReadParallelism = config.getIntOrDefault(ALL_PARTITIONS_READ_PARALLELISM);
this.changedPartitionsReadParallelism = config.getIntOrDefault(CHANGED_PARTITIONS_READ_PARALLELISM);
this.changeParallelism = config.getIntOrDefault(PARTITION_CHANGE_PARALLELISM);
public AWSGlueCatalogSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaClient) {
this(buildAsyncClient(config), config, metaClient);
}

AWSGlueCatalogSyncClient(GlueAsyncClient awsGlue, HiveSyncConfig config) {
super(config);
AWSGlueCatalogSyncClient(GlueAsyncClient awsGlue, HiveSyncConfig config, HoodieTableMetaClient metaClient) {
super(config, metaClient);
this.awsGlue = awsGlue;
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
this.skipTableArchive = config.getBooleanOrDefault(GlueCatalogSyncClientConfig.GLUE_SKIP_TABLE_ARCHIVE);
Expand All @@ -179,6 +163,20 @@ public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
this.changeParallelism = config.getIntOrDefault(PARTITION_CHANGE_PARALLELISM);
}

private static GlueAsyncClient buildAsyncClient(HiveSyncConfig config) {
try {
GlueAsyncClientBuilder awsGlueBuilder = GlueAsyncClient.builder()
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(config.getProps()));
awsGlueBuilder = config.getString(AWS_GLUE_ENDPOINT) == null ? awsGlueBuilder :
awsGlueBuilder.endpointOverride(new URI(config.getString(AWS_GLUE_ENDPOINT)));
awsGlueBuilder = config.getString(AWS_GLUE_REGION) == null ? awsGlueBuilder :
awsGlueBuilder.region(Region.of(config.getString(AWS_GLUE_REGION)));
return awsGlueBuilder.build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

private List<Partition> getPartitionsSegment(Segment segment, String tableName) {
try {
List<Partition> partitions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hudi.aws.sync;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
Expand All @@ -44,13 +46,13 @@
*/
public class AwsGlueCatalogSyncTool extends HiveSyncTool {

public AwsGlueCatalogSyncTool(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
public AwsGlueCatalogSyncTool(Properties props, Configuration hadoopConf, Option<HoodieTableMetaClient> metaClient) {
super(props, hadoopConf, metaClient);
}

@Override
protected void initSyncClient(HiveSyncConfig hiveSyncConfig) {
syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig);
protected void initSyncClient(HiveSyncConfig hiveSyncConfig, HoodieTableMetaClient metaClient) {
syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig, metaClient);
}

@Override
Expand All @@ -69,7 +71,7 @@ public static void main(String[] args) {
// HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
TypedProperties props = params.toProps();
Configuration hadoopConf = HadoopFSUtils.getFs(props.getString(META_SYNC_BASE_PATH.key()), new Configuration()).getConf();
try (AwsGlueCatalogSyncTool tool = new AwsGlueCatalogSyncTool(props, hadoopConf)) {
try (AwsGlueCatalogSyncTool tool = new AwsGlueCatalogSyncTool(props, hadoopConf, Option.empty())) {
tool.syncHoodieTable();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public static DynamoDbBasedLockConfig.Builder newBuilder() {
return new DynamoDbBasedLockConfig.Builder();
}

// The max length of DDB partition key allowed.
public static final int MAX_PARTITION_KEY_SIZE_BYTE = 2048;

// configs for DynamoDb based locks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ public void setUp() throws Exception {
fileSystem = hiveSyncConfig.getHadoopFileSystem();
fileSystem.mkdirs(new Path(tablePath));
StorageConfiguration<?> configuration = HadoopFSUtils.getStorageConf(new Configuration());
HoodieTableMetaClient.withPropertyBuilder()
HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(TABLE_NAME)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, tablePath);

glueSync = new AWSGlueCatalogSyncClient(new HiveSyncConfig(hiveSyncProps));
glueSync = new AWSGlueCatalogSyncClient(new HiveSyncConfig(hiveSyncProps), metaClient);
glueSync.awsGlue.createDatabase(CreateDatabaseRequest.builder().databaseInput(DatabaseInput.builder().name(DB_NAME).build()).build()).get();

glueSync.awsGlue.createTable(CreateTableRequest.builder().databaseName(DB_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.aws.sync;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.HiveSyncConfig;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -32,11 +34,11 @@ class MockAwsGlueCatalogSyncTool extends AwsGlueCatalogSyncTool {
private GlueAsyncClient mockAwsGlue;

public MockAwsGlueCatalogSyncTool(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
super(props, hadoopConf, Option.empty());
}

@Override
protected void initSyncClient(HiveSyncConfig hiveSyncConfig) {
syncClient = new AWSGlueCatalogSyncClient(mockAwsGlue, hiveSyncConfig);
protected void initSyncClient(HiveSyncConfig hiveSyncConfig, HoodieTableMetaClient metaClient) {
syncClient = new AWSGlueCatalogSyncClient(mockAwsGlue, hiveSyncConfig, metaClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class TestAWSGlueSyncClient {
@BeforeEach
void setUp() throws IOException {
GlueTestUtil.setUp();
awsGlueSyncClient = new AWSGlueCatalogSyncClient(mockAwsGlue, GlueTestUtil.getHiveSyncConfig());
awsGlueSyncClient = new AWSGlueCatalogSyncClient(mockAwsGlue, GlueTestUtil.getHiveSyncConfig(), GlueTestUtil.getMetaClient());
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,32 @@
package org.apache.hudi.aws.sync;

import org.apache.hudi.aws.testutils.GlueTestUtil;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.sync.common.HoodieSyncTool;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.services.glue.GlueAsyncClient;
import software.amazon.awssdk.services.glue.GlueAsyncClientBuilder;

import java.io.IOException;

import static org.apache.hudi.aws.testutils.GlueTestUtil.getHadoopConf;
import static org.apache.hudi.aws.testutils.GlueTestUtil.glueSyncProps;
import static org.apache.hudi.config.GlueCatalogSyncClientConfig.RECREATE_GLUE_TABLE_ON_ERROR;
import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class TestAwsGlueSyncTool {
Expand All @@ -42,7 +54,7 @@ class TestAwsGlueSyncTool {
@BeforeEach
void setUp() throws IOException {
GlueTestUtil.setUp();
awsGlueCatalogSyncTool = new MockAwsGlueCatalogSyncTool(glueSyncProps, GlueTestUtil.getHadoopConf());
awsGlueCatalogSyncTool = new MockAwsGlueCatalogSyncTool(glueSyncProps, getHadoopConf());
}

@AfterEach
Expand All @@ -64,6 +76,27 @@ void testShouldRecreateAndSyncTableOverride() {
}

private void reinitGlueSyncTool() {
awsGlueCatalogSyncTool = new MockAwsGlueCatalogSyncTool(glueSyncProps, GlueTestUtil.getHadoopConf());
awsGlueCatalogSyncTool = new MockAwsGlueCatalogSyncTool(glueSyncProps, getHadoopConf());
}

@Test
void validateInitThroughSyncTool() throws Exception {
try (MockedStatic<GlueAsyncClient> mockedStatic = mockStatic(GlueAsyncClient.class)) {
GlueAsyncClientBuilder builder = mock(GlueAsyncClientBuilder.class);
mockedStatic.when(GlueAsyncClient::builder).thenReturn(builder);
when(builder.credentialsProvider(any())).thenReturn(builder);
GlueAsyncClient mockClient = mock(GlueAsyncClient.class);
when(builder.build()).thenReturn(mockClient);
HoodieSyncTool syncTool = SyncUtilHelpers.instantiateMetaSyncTool(
AwsGlueCatalogSyncTool.class.getName(),
new TypedProperties(),
getHadoopConf(),
GlueTestUtil.fileSystem,
GlueTestUtil.getMetaClient().getBasePath().toString(),
"PARQUET",
Option.empty());
assertTrue(syncTool instanceof AwsGlueCatalogSyncTool);
syncTool.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class GlueTestUtil {
public static FileSystem fileSystem;
private static HiveSyncConfig hiveSyncConfig;
private static Configuration hadoopConf;
private static HoodieTableMetaClient metaClient;

public static void setUp() throws IOException {
basePath = Files.createTempDirectory("glueClientTest" + Instant.now().toEpochMilli()).toUri().toString();
Expand Down Expand Up @@ -103,7 +104,7 @@ public static void createHoodieTable() throws IOException {
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
HoodieTableMetaClient.withPropertyBuilder()
metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(TABLE_NAME)
.setPayloadClass(HoodieAvroPayload.class)
Expand All @@ -130,4 +131,8 @@ private static void createMetaFile(String basePath, String fileName, HoodieCommi
public static Column getColumn(String name, String type, String comment) {
return Column.builder().name(name).type(type).comment(comment).build();
}

public static HoodieTableMetaClient getMetaClient() {
return metaClient;
}
}
6 changes: 5 additions & 1 deletion hudi-client/hudi-client-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit 23c4f46

Please sign in to comment.