diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index 1d5b09629e4c5..b24aecf46c1a7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -217,16 +217,24 @@ public class HoodieLockConfig extends HoodieConfig { .withDocumentation("Lock provider class name, this should be subclass of " + "org.apache.hudi.client.transaction.ConflictResolutionStrategy"); - /** @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} and its methods instead */ + /** + * @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} and its methods instead + */ @Deprecated public static final String WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP = WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key(); - /** @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} and its methods instead */ + /** + * @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} and its methods instead + */ @Deprecated public static final String DEFAULT_WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS = WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.defaultValue(); - /** @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead */ + /** + * @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead + */ @Deprecated public static final String LOCK_PROVIDER_CLASS_PROP = LOCK_PROVIDER_CLASS_NAME.key(); - /** @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead */ + /** + * @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead + */ @Deprecated public static final String DEFAULT_LOCK_PROVIDER_CLASS = LOCK_PROVIDER_CLASS_NAME.defaultValue(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index ba94d80d674c6..01b8fa5594899 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -558,6 +558,12 @@ public class HoodieWriteConfig extends HoodieConfig { .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name()) .withDocumentation(WriteConcurrencyMode.class); + public static final ConfigProperty NUM_RETRIES_ON_CONFLICT_FAILURES = ConfigProperty + .key("hoodie.write.num.retries.on.conflict.failures") + .defaultValue(0) + .sinceVersion("0.13.0") + .withDocumentation("Maximum number of times to retry a batch on conflict failure."); + public static final ConfigProperty WRITE_SCHEMA_OVERRIDE = ConfigProperty .key("hoodie.write.schema") .noDefaultValue() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index e98d72d82844c..57baba29c92e1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -21,7 +21,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hudi.AutoRecordKeyGenerationUtils.{isAutoGenerateRecordKeys, mayBeValidateParamsForAutoGenerationOfRecordKeys} +import org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType, convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace} import org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig import org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty @@ -48,17 +48,15 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig} -import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException} +import org.apache.hudi.exception.{HoodieException, HoodieWriteConflictException, SchemaCompatibilityException} import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} -import org.apache.hudi.index.HoodieIndex -import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName -import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.hudi.keygen.{BaseKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.metrics.Metrics import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.SyncUtilHelpers @@ -122,6 +120,38 @@ object HoodieSparkSqlWriter { sourceDf: DataFrame, streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): + + (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { + var succeeded = false + var counter = 0 + val maxRetry: Integer = Integer.parseInt(optParams.getOrElse(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(), HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.defaultValue().toString)) + var toReturn: (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = null + + while (counter <= maxRetry && !succeeded) { + try { + toReturn = writeInternal(sqlContext, mode, optParams, sourceDf, streamingWritesParamsOpt, hoodieWriteClient) + log.warn(s"Succeeded with attempt no $counter") + succeeded = true + } catch { + case e: HoodieWriteConflictException => + val writeConcurrencyMode = optParams.getOrElse(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()) + if (writeConcurrencyMode.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()) && counter < maxRetry) { + counter += 1 + log.warn(s"Conflict found. Retrying again for attempt no $counter") + } else { + throw e + } + } + } + toReturn + } + + def writeInternal(sqlContext: SQLContext, + mode: SaveMode, + optParams: Map[String, String], + sourceDf: DataFrame, + streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, + hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index bb36b9cdd271a..104996d5c4fdb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -23,11 +23,11 @@ import org.apache.hudi.DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs} import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD} +import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType +import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} import org.apache.hudi.common.table.timeline.{HoodieInstant, TimelineUtils} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator @@ -59,6 +59,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource} import java.sql.{Date, Timestamp} +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.function.Consumer import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -555,11 +556,70 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(snapshotDF2.count(), 80) } + /** + * Test retries on conflict failures. + */ + @ParameterizedTest + @ValueSource(ints = Array(0, 2)) + def testCopyOnWriteConcurrentUpdates(numRetries: Integer): Unit = { + initTestDataGenerator() + val records1 = recordsToStrings(dataGen.generateInserts("000", 1000)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control") + .option("hoodie.cleaner.policy.failed.writes", "LAZY") + .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider") + .mode(SaveMode.Overwrite) + .save(basePath) + + val snapshotDF1 = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + assertEquals(1000, snapshotDF1.count()) + + val countDownLatch = new CountDownLatch(2) + for (x <- 1 to 2) { + val thread = new Thread(new UpdateThread(dataGen, spark, commonOpts, basePath, x + "00", countDownLatch, numRetries)) + thread.setName((x + "00_THREAD").toString()) + thread.start() + } + countDownLatch.await(1, TimeUnit.MINUTES) + + val snapshotDF2 = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + if (numRetries > 0) { + assertEquals(snapshotDF2.count(), 3000) + assertEquals(HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size(), 3) + } else { + // only one among two threads will succeed and hence 2000 + assertEquals(snapshotDF2.count(), 2000) + assertEquals(HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size(), 2) + } + } + + class UpdateThread(dataGen: HoodieTestDataGenerator, spark: SparkSession, commonOpts: Map[String, String], basePath: String, + instantTime: String, countDownLatch: CountDownLatch, numRetries: Integer = 0) extends Runnable { + override def run() { + val updateRecs = recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 500)).toList + val insertRecs = recordsToStrings(dataGen.generateInserts(instantTime, 1000)).toList + val updateDf = spark.read.json(spark.sparkContext.parallelize(updateRecs, 2)) + val insertDf = spark.read.json(spark.sparkContext.parallelize(insertRecs, 2)) + updateDf.union(insertDf).write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control") + .option("hoodie.cleaner.policy.failed.writes", "LAZY") + .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider") + .option(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(), numRetries.toString) + .mode(SaveMode.Append) + .save(basePath) + countDownLatch.countDown() + } + } + @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testOverWriteModeUseReplaceAction(recordType: HoodieRecordType): Unit = { val (writeOpts, readOpts) = getWriterReaderOpts(recordType) - val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi")