Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6508] Fix compile errors with JDK11 #9300

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ jobs:

steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
- name: Set up JDK 11
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we have a java11 pipline, move to there

uses: actions/setup-java@v3
with:
java-version: '8'
java-version: '11'
distribution: 'adopt'
architecture: x64
cache: maven
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -687,9 +688,9 @@ public void testHandleUpdateWithMultiplePartitions() throws Exception {
BaseSparkDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(context(), cfg, hoodieTable,
newDeleteTime, HoodieJavaRDD.of(deleteRDD));
actionExecutor.getUpsertPartitioner(new WorkloadProfile(buildProfile(deleteRDD)));
final List<List<WriteStatus>> deleteStatus = jsc().parallelize(Arrays.asList(1)).map(x -> {
return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator());
}).map(Transformations::flatten).collect();
final List<List<WriteStatus>> deleteStatus = jsc().parallelize(Collections.singletonList(1))
.map(x -> (Iterator<List<WriteStatus>>) actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator()))
.map(Transformations::flatten).collect();

// Verify there are errors because records are from multiple partitions (but handleUpdate is invoked for
// specific partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -463,9 +465,9 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
BaseSparkCommitActionExecutor actionExecutor = new SparkInsertCommitActionExecutor(context, config, table,
instantTime, context.parallelize(inserts));
final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> {
return actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator());
}).map(Transformations::flatten).collect();
final List<List<WriteStatus>> ws = jsc.parallelize(Collections.singletonList(1))
.map(x -> (Iterator<List<WriteStatus>>) actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator()))
.map(Transformations::flatten).collect();

WriteStatus writeStatus = ws.get(0).get(0);
String fileId = writeStatus.getFileId();
Expand All @@ -477,9 +479,9 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, HoodieTableMetaClient.reload(metaClient));
BaseSparkCommitActionExecutor newActionExecutor = new SparkUpsertCommitActionExecutor(context, config, table,
instantTime, context.parallelize(updates));
final List<List<WriteStatus>> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator());
}).map(Transformations::flatten).collect();
final List<List<WriteStatus>> updateStatus = jsc.parallelize(Collections.singletonList(1))
.map(x -> (Iterator<List<WriteStatus>>) newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator()))
.map(Transformations::flatten).collect();
assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,15 @@ public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector =
Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);

// Collect stats from all individual Parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap =
(Map<String, List<HoodieColumnRangeMetadata<Comparable>>>) metadata.getBlocks().stream().sequential()
.flatMap(blockMetaData ->
blockMetaData.getColumns().stream()
// Explicitly specify the type before collect since JDK11 struggles to infer it
Stream<HoodieColumnRangeMetadata<Comparable>> hoodieColumnRangeMetadataStream = metadata.getBlocks().stream().sequential()
.flatMap(blockMetaData ->
blockMetaData.getColumns().stream()
.filter(f -> cols.contains(f.getPath().toDotString()))
.map(columnChunkMetaData -> {
Statistics stats = columnChunkMetaData.getStatistics();
return HoodieColumnRangeMetadata.<Comparable>create(
// Explicitly specify the type since JDK11 struggles to infer it
return (HoodieColumnRangeMetadata<Comparable>) HoodieColumnRangeMetadata.<Comparable>create(
parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
convertToNativeJavaType(
Expand All @@ -342,8 +342,11 @@ public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
columnChunkMetaData.getTotalSize(),
columnChunkMetaData.getTotalUncompressedSize());
})
)
.collect(groupingByCollector);
);

// Collect stats from all individual Parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap =
hoodieColumnRangeMetadataStream.collect(groupingByCollector);

// Combine those into file-level statistics
// NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,26 +241,28 @@ class ColumnStats {
});

Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, HoodieColumnRangeMetadata<Comparable>>> collector =
Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), Function.identity());
Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, Function.identity());

return (Map<String, HoodieColumnRangeMetadata<Comparable>>) targetFields.stream()
// Explicitly specify the type before collect since JDK11 struggles to infer it
Stream<HoodieColumnRangeMetadata<Comparable>> hoodieColumnRangeMetadataStream = targetFields.stream()
.map(field -> {
ColumnStats colStats = allColumnStats.get(field.name());
return HoodieColumnRangeMetadata.<Comparable>create(
filePath,
field.name(),
colStats == null ? null : coerceToComparable(field.schema(), colStats.minValue),
colStats == null ? null : coerceToComparable(field.schema(), colStats.maxValue),
colStats == null ? 0 : colStats.nullCount,
colStats == null ? 0 : colStats.valueCount,
colStats == null ? 0L : colStats.nullCount,
colStats == null ? 0L : colStats.valueCount,
// NOTE: Size and compressed size statistics are set to 0 to make sure we're not
// mixing up those provided by Parquet with the ones from other encodings,
// since those are not directly comparable
0,
0
0L,
0L
);
})
.collect(collector);
});

return hoodieColumnRangeMetadataStream.collect(collector);
}

/**
Expand Down
14 changes: 0 additions & 14 deletions hudi-examples/hudi-examples-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,6 @@
</resources>

<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
Copy link
Contributor Author

@Zouxxyy Zouxxyy Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will build failue at JDK11, see davidB/scala-maven-plugin#234

Note: this module does not need the scala plugin at all, so just remove it, like #8336

<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
14 changes: 0 additions & 14 deletions hudi-examples/hudi-examples-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.functional

import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.HoodieTableType
Expand All @@ -28,11 +27,10 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.testutils.HoodieSparkClientTestBase

import org.apache.spark.sql._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource

import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.config._
import org.apache.hudi.exception.HoodieWriteConflictException
import org.apache.hudi.functional.TestCOWDataSourceStorage.{SQL_DRIVER_IS_NOT_NULL, SQL_DRIVER_IS_NULL, SQL_QUERY_EQUALITY_VALIDATOR_CLASS_NAME, SQL_QUERY_INEQUALITY_VALIDATOR_CLASS_NAME, SQL_RIDER_IS_NOT_NULL, SQL_RIDER_IS_NULL}
import org.apache.hudi.metadata.{HoodieBackedTableMetadata, MetadataPartitionType}
import org.apache.hudi.util.JavaConversions

import org.apache.spark.sql._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api._
Expand Down
Loading