Skip to content

Commit

Permalink
[HUDI-8023] Add multiwriter test for secondary and partition stats in…
Browse files Browse the repository at this point in the history
…dex (#11948)
  • Loading branch information
codope committed Sep 18, 2024
1 parent 461e58b commit c87663d
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.model.{FileSlice, HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.{HoodieCleanConfig, HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieWriteConflictException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex}

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.types.StringType
Expand All @@ -36,7 +39,10 @@ import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource

import java.util.concurrent.Executors
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

/**
* Test cases on partition stats index with Spark datasource.
Expand Down Expand Up @@ -144,6 +150,56 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase {
verifyQueryPredicate(hudiOpts)
}

/**
* Test case to do a write with updates and then validate partition stats with multi-writer.
*/
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
def testPartitionStatsWithMultiWriter(tableType: HoodieTableType): Unit = {
val hudiOpts = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() -> WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key() -> HoodieFailedWritesCleaningPolicy.LAZY.name,
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> classOf[InProcessLockProvider].getName,
HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key() -> classOf[SimpleConcurrentFileWritesConflictResolutionStrategy].getName
)

doWriteAndValidateDataAndPartitionStats(hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite,
validate = false)

val executor = Executors.newFixedThreadPool(2)
implicit val executorContext: ExecutionContext = ExecutionContext.fromExecutor(executor)
val function = new Function0[Boolean] {
override def apply(): Boolean = {
try {
doWriteAndValidateDataAndPartitionStats(hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
validate = false)
true
} catch {
case _: HoodieWriteConflictException => false
case e => throw new Exception("Multi write failed", e)
}
}
}
val f1 = Future[Boolean] {
function.apply()
}
val f2 = Future[Boolean] {
function.apply()
}

Await.result(f1, Duration("5 minutes"))
Await.result(f2, Duration("5 minutes"))

assertTrue(f1.value.get.get || f2.value.get.get)
executor.shutdownNow()
validateDataAndPartitionStats()
}

/**
* Test case to do a write with updates using partitionBy and validation partition filters pushed down to physical plan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
import org.apache.hudi.common.model.{FileSlice, HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieWriteConflictException
import org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
import org.apache.hudi.metadata.{HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView, SparkHoodieBackedTableMetadataWriter}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
Expand All @@ -40,10 +43,13 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
import org.scalatest.Assertions.assertResult

import java.util.concurrent.Executors
import scala.collection.JavaConverters
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

/**
* Test cases for secondary index
Expand Down Expand Up @@ -316,6 +322,111 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness {
}
}

/**
* Test case to write with updates and validate secondary index with multiple writers.
*/
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
def testSecondaryIndexWithConcurrentWrites(tableType: HoodieTableType): Unit = {
if (HoodieSparkUtils.gteqSpark3_3) {
val tableName = "hudi_multi_writer_table_" + tableType.name()

// Common Hudi options
val hudiOpts = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
HoodieWriteConfig.TBL_NAME.key -> tableName,
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() -> WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key() -> HoodieFailedWritesCleaningPolicy.LAZY.name,
HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> classOf[InProcessLockProvider].getName,
HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key() -> classOf[SimpleConcurrentFileWritesConflictResolutionStrategy].getName
)

// Create the Hudi table
spark.sql(
s"""
|CREATE TABLE $tableName (
| ts BIGINT,
| record_key_col STRING,
| not_record_key_col STRING,
| partition_key_col STRING
|) USING hudi
| OPTIONS (
| primaryKey = 'record_key_col',
| preCombineField = 'ts',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'record_key_col',
| hoodie.enable.data.skipping = 'true'
| )
| PARTITIONED BY (partition_key_col)
| LOCATION '$basePath'
""".stripMargin)
// Insert some data
spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
// create secondary index
spark.sql(s"create index idx_not_record_key_col on $tableName using secondary_index(not_record_key_col)")

val executor = Executors.newFixedThreadPool(2)
implicit val executorContext: ExecutionContext = ExecutionContext.fromExecutor(executor)
val function = new Function1[Int, Boolean] {
override def apply(writerId: Int): Boolean = {
try {
val data = if(writerId == 1) Seq(
(System.currentTimeMillis(), s"row$writerId", s"value${writerId}_1", s"p$writerId")
) else Seq(
(System.currentTimeMillis(), s"row$writerId", s"value${writerId}_2", s"p$writerId")
)

val df = spark.createDataFrame(data).toDF("ts", "record_key_col", "not_record_key_col", "partition_key_col")
df.write.format("hudi")
.options(hudiOpts)
.mode("append")
.save(basePath)
true
} catch {
case _: HoodieWriteConflictException => false
case e => throw new Exception("Multi write failed", e)
}
}
}
// Set up futures for two writers
val f1 = Future[Boolean] {
function.apply(1)
}(executorContext)
val f2 = Future[Boolean] {
function.apply(2)
}(executorContext)

Await.result(f1, Duration("5 minutes"))
Await.result(f2, Duration("5 minutes"))

assertTrue(f1.value.get.get || f2.value.get.get)
executor.shutdownNow()

// Validate the secondary index got created
metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))

// Query the secondary index metadata
checkAnswer(
s"""
|SELECT key, SecondaryIndexMetadata.recordKey, SecondaryIndexMetadata.isDeleted
|FROM hudi_metadata('$basePath')
|WHERE type=7
""".stripMargin)(
Seq("abc", "row1", true),
Seq("cde", "row2", true),
Seq("value1_1", "row1", false),
Seq("value2_2", "row2", false)
)
}
}

private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
assertResult(expects.map(row => Row(row: _*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
}
Expand Down

0 comments on commit c87663d

Please sign in to comment.