From 60dcc8af7e9dc832dff8de7c007848c151e5673e Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 25 Jun 2024 10:49:26 -0700 Subject: [PATCH] [HUDI-6508] Support compilation on Java 11 --- .github/workflows/bot.yml | 167 ++++++++++++++++-- .../table/TestHoodieMergeOnReadTable.java | 8 +- .../commit/TestCopyOnWriteActionExecutor.java | 15 +- .../metadata/HoodieTableMetadataUtil.java | 21 +-- hudi-examples/hudi-examples-common/pom.xml | 14 -- hudi-examples/hudi-examples-java/pom.xml | 14 -- .../apache/hudi/common/util/ParquetUtils.java | 21 +-- 7 files changed, 186 insertions(+), 74 deletions(-) diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 72200c4822d1..5d659123f133 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -245,12 +245,6 @@ jobs: - scalaProfile: "scala-2.12" sparkProfile: "spark3.4" sparkModules: "hudi-spark-datasource/hudi-spark3.4.x" - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" steps: - uses: actions/checkout@v3 @@ -285,7 +279,6 @@ jobs: SCALA_PROFILE: ${{ matrix.scalaProfile }} SPARK_PROFILE: ${{ matrix.sparkProfile }} SPARK_MODULES: ${{ matrix.sparkModules }} - if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI run: mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS - name: Java FT - Spark @@ -293,7 +286,6 @@ jobs: SCALA_PROFILE: ${{ matrix.scalaProfile }} SPARK_PROFILE: ${{ matrix.sparkProfile }} SPARK_MODULES: ${{ matrix.sparkModules }} - if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI run: mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS @@ -308,6 +300,49 @@ jobs: - scalaProfile: "scala-2.12" sparkProfile: "spark3.4" sparkModules: "hudi-spark-datasource/hudi-spark3.4.x" + + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 8 + uses: actions/setup-java@v3 + with: + java-version: '8' + distribution: 'temurin' + architecture: x64 + cache: maven + - name: Build Project + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + run: + mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" + - name: Set up JDK 17 + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + architecture: x64 + cache: maven + - name: Scala UT - Common & Spark + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_MODULES: ${{ matrix.sparkModules }} + run: + mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS + - name: Scala FT - Spark + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_MODULES: ${{ matrix.sparkModules }} + run: + mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS + + test-spark-java11-17-java-tests: + runs-on: ubuntu-latest + strategy: + matrix: + include: - scalaProfile: "scala-2.12" sparkProfile: "spark3.5" sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" @@ -317,10 +352,65 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Set up JDK 8 + - name: Set up JDK 11 uses: actions/setup-java@v3 with: - java-version: '8' + java-version: '11' + distribution: 'temurin' + architecture: x64 + cache: maven + - name: Build Project + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + run: + mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" + - name: Set up JDK 17 + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + architecture: x64 + cache: maven + - name: Quickstart Test + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + run: + mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl hudi-examples/hudi-examples-spark $MVN_ARGS + - name: Java UT - Common & Spark + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_MODULES: ${{ matrix.sparkModules }} + run: + mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS + - name: Java FT - Spark + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_MODULES: ${{ matrix.sparkModules }} + run: + mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS + + test-spark-java11-17-scala-tests: + runs-on: ubuntu-latest + strategy: + matrix: + include: + - scalaProfile: "scala-2.12" + sparkProfile: "spark3.5" + sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" + - scalaProfile: "scala-2.13" + sparkProfile: "spark3.5" + sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" + + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + java-version: '11' distribution: 'temurin' architecture: x64 cache: maven @@ -342,7 +432,6 @@ jobs: SCALA_PROFILE: ${{ matrix.scalaProfile }} SPARK_PROFILE: ${{ matrix.sparkProfile }} SPARK_MODULES: ${{ matrix.sparkModules }} - if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI run: mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS - name: Scala FT - Spark @@ -350,7 +439,6 @@ jobs: SCALA_PROFILE: ${{ matrix.scalaProfile }} SPARK_PROFILE: ${{ matrix.sparkProfile }} SPARK_MODULES: ${{ matrix.sparkModules }} - if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI run: mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS @@ -527,6 +615,61 @@ jobs: HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout) ./packaging/bundle-validation/ci_run.sh hudi_docker_java17 $HUDI_VERSION openjdk17 + validate-bundles-java11: + runs-on: ubuntu-latest + strategy: + matrix: + include: + - scalaProfile: 'scala-2.13' + flinkProfile: 'flink1.18' + sparkProfile: 'spark3.5' + sparkRuntime: 'spark3.5.0' + - scalaProfile: 'scala-2.12' + flinkProfile: 'flink1.18' + sparkProfile: 'spark3.5' + sparkRuntime: 'spark3.5.0' + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + java-version: '11' + distribution: 'temurin' + architecture: x64 + cache: maven + - name: Build Project + env: + FLINK_PROFILE: ${{ matrix.flinkProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SCALA_PROFILE: ${{ matrix.scalaProfile }} + run: | + if [ "$SCALA_PROFILE" == "scala-2.13" ]; then + mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DdeployArtifacts=true -DskipTests=true $MVN_ARGS -Dmaven.javadoc.skip=true -pl packaging/hudi-hadoop-mr-bundle,packaging/hudi-spark-bundle,packaging/hudi-utilities-bundle,packaging/hudi-utilities-slim-bundle -am + else + mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DdeployArtifacts=true -DskipTests=true $MVN_ARGS -Dmaven.javadoc.skip=true + # TODO remove the sudo below. It's a needed workaround as detailed in HUDI-5708. + sudo chown -R "$USER:$(id -g -n)" hudi-platform-service/hudi-metaserver/target/generated-sources + mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -DdeployArtifacts=true -DskipTests=true $MVN_ARGS -Dmaven.javadoc.skip=true -pl packaging/hudi-flink-bundle -am -Davro.version=1.10.0 + fi + - name: IT - Bundle Validation - OpenJDK 11 + env: + FLINK_PROFILE: ${{ matrix.flinkProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_RUNTIME: ${{ matrix.sparkRuntime }} + SCALA_PROFILE: ${{ matrix.scalaProfile }} + run: | + HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout) + ./packaging/bundle-validation/ci_run.sh hudi_docker_java11 $HUDI_VERSION openjdk11 + - name: IT - Bundle Validation - OpenJDK 17 + env: + FLINK_PROFILE: ${{ matrix.flinkProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_RUNTIME: ${{ matrix.sparkRuntime }} + SCALA_PROFILE: ${{ matrix.scalaProfile }} + run: | + HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout) + ./packaging/bundle-validation/ci_run.sh hudi_docker_java17 $HUDI_VERSION openjdk17 + integration-tests: runs-on: ubuntu-latest strategy: diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 9e1f4277c57f..76d534630e11 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -74,6 +74,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -695,9 +696,10 @@ public void testHandleUpdateWithMultiplePartitions() throws Exception { BaseSparkDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(context(), cfg, hoodieTable, newDeleteTime, HoodieJavaRDD.of(deleteRDD)); actionExecutor.getUpsertPartitioner(new WorkloadProfile(buildProfile(deleteRDD))); - final List> deleteStatus = jsc().parallelize(Arrays.asList(1)).map(x -> { - return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator()); - }).map(Transformations::flatten).collect(); + final List> deleteStatus = jsc().parallelize(Arrays.asList(1)) + .map(x -> (Iterator>) + actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator())) + .map(Transformations::flatten).collect(); // Verify there are errors because records are from multiple partitions (but handleUpdate is invoked for // specific partition) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 03f0cf158cdd..1164275524c4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -80,6 +80,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -473,9 +474,10 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception { BaseSparkCommitActionExecutor actionExecutor = new SparkInsertCommitActionExecutor(context, config, table, instantTime, context.parallelize(inserts)); - final List> ws = jsc.parallelize(Arrays.asList(1)).map(x -> { - return actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator()); - }).map(Transformations::flatten).collect(); + final List> ws = jsc.parallelize(Arrays.asList(1)) + .map(x -> (Iterator>) + actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator())) + .map(Transformations::flatten).collect(); WriteStatus writeStatus = ws.get(0).get(0); String fileId = writeStatus.getFileId(); @@ -492,9 +494,10 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception { BaseSparkCommitActionExecutor newActionExecutor = new SparkUpsertCommitActionExecutor(context, config, table, instantTime, context.parallelize(updates)); - final List> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> { - return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator()); - }).map(Transformations::flatten).collect(); + final List> updateStatus = jsc.parallelize(Arrays.asList(1)) + .map(x -> (Iterator>) + newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator())) + .map(Transformations::flatten).collect(); assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 46f9a8b50a3a..4c96148aab11 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -105,7 +105,6 @@ import java.util.UUID; import java.util.function.BiFunction; import java.util.function.Function; -import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -244,27 +243,25 @@ class ColumnStats { }); }); - Collector, ?, Map>> collector = - Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, Function.identity()); - - return (Map>) targetFields.stream() - .map(field -> { + Stream> hoodieColumnRangeMetadataStream = + targetFields.stream().map(field -> { ColumnStats colStats = allColumnStats.get(field.name()); return HoodieColumnRangeMetadata.create( filePath, field.name(), colStats == null ? null : coerceToComparable(field.schema(), colStats.minValue), colStats == null ? null : coerceToComparable(field.schema(), colStats.maxValue), - colStats == null ? 0 : colStats.nullCount, - colStats == null ? 0 : colStats.valueCount, + colStats == null ? 0L : colStats.nullCount, + colStats == null ? 0L : colStats.valueCount, // NOTE: Size and compressed size statistics are set to 0 to make sure we're not // mixing up those provided by Parquet with the ones from other encodings, // since those are not directly comparable - 0, - 0 + 0L, + 0L ); - }) - .collect(collector); + }); + return hoodieColumnRangeMetadataStream.collect( + Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, Function.identity())); } /** diff --git a/hudi-examples/hudi-examples-common/pom.xml b/hudi-examples/hudi-examples-common/pom.xml index 4b253e18d0f9..5a01759b589c 100644 --- a/hudi-examples/hudi-examples-common/pom.xml +++ b/hudi-examples/hudi-examples-common/pom.xml @@ -40,20 +40,6 @@ - - net.alchim31.maven - scala-maven-plugin - - - scala-compile-first - process-resources - - add-source - compile - - - - org.apache.maven.plugins maven-compiler-plugin diff --git a/hudi-examples/hudi-examples-java/pom.xml b/hudi-examples/hudi-examples-java/pom.xml index e69fed389a43..3916aae2e402 100644 --- a/hudi-examples/hudi-examples-java/pom.xml +++ b/hudi-examples/hudi-examples-java/pom.xml @@ -59,20 +59,6 @@ - - net.alchim31.maven - scala-maven-plugin - - - scala-compile-first - process-resources - - add-source - compile - - - - org.apache.maven.plugins maven-compiler-plugin diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 1dad2d237cff..7de11eed609c 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -72,7 +72,6 @@ import java.util.Properties; import java.util.Set; import java.util.function.Function; -import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -264,20 +263,14 @@ public List> readColumnStatsFromMetadata(H List columnList) { ParquetMetadata metadata = readMetadata(storage, filePath); - // NOTE: This collector has to have fully specialized generic type params since - // Java 1.8 struggles to infer them - Collector, ?, Map>>> groupingByCollector = - Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName); - // Collect stats from all individual Parquet blocks - Map>> columnToStatsListMap = - (Map>>) metadata.getBlocks().stream().sequential() - .flatMap(blockMetaData -> - blockMetaData.getColumns().stream() + Stream> hoodieColumnRangeMetadataStream = + metadata.getBlocks().stream().sequential().flatMap(blockMetaData -> + blockMetaData.getColumns().stream() .filter(f -> columnList.contains(f.getPath().toDotString())) .map(columnChunkMetaData -> { Statistics stats = columnChunkMetaData.getStatistics(); - return HoodieColumnRangeMetadata.create( + return (HoodieColumnRangeMetadata) HoodieColumnRangeMetadata.create( filePath.getName(), columnChunkMetaData.getPath().toDotString(), convertToNativeJavaType( @@ -294,8 +287,10 @@ public List> readColumnStatsFromMetadata(H columnChunkMetaData.getTotalSize(), columnChunkMetaData.getTotalUncompressedSize()); }) - ) - .collect(groupingByCollector); + ); + + Map>> columnToStatsListMap = + hoodieColumnRangeMetadataStream.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName)); // Combine those into file-level statistics // NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer