Skip to content

Commit

Permalink
Fix compilation errors in JDK11
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Jul 29, 2023
1 parent 64310f5 commit dbad787
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 68 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ jobs:

steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '8'
java-version: '11'
distribution: 'adopt'
architecture: x64
- name: Build Project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -685,9 +686,9 @@ public void testHandleUpdateWithMultiplePartitions() throws Exception {
BaseSparkDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(context(), cfg, hoodieTable,
newDeleteTime, HoodieJavaRDD.of(deleteRDD));
actionExecutor.getUpsertPartitioner(new WorkloadProfile(buildProfile(deleteRDD)));
final List<List<WriteStatus>> deleteStatus = jsc().parallelize(Arrays.asList(1)).map(x -> {
return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator());
}).map(Transformations::flatten).collect();
final List<List<WriteStatus>> deleteStatus = jsc().parallelize(Collections.singletonList(1))
.map(x -> (Iterator<List<WriteStatus>>) actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator()))
.map(Transformations::flatten).collect();

// Verify there are errors because records are from multiple partitions (but handleUpdate is invoked for
// specific partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -463,9 +465,9 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
BaseSparkCommitActionExecutor actionExecutor = new SparkInsertCommitActionExecutor(context, config, table,
instantTime, context.parallelize(inserts));
final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> {
return actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator());
}).map(Transformations::flatten).collect();
final List<List<WriteStatus>> ws = jsc.parallelize(Collections.singletonList(1))
.map(x -> (Iterator<List<WriteStatus>>) actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator()))
.map(Transformations::flatten).collect();

WriteStatus writeStatus = ws.get(0).get(0);
String fileId = writeStatus.getFileId();
Expand All @@ -477,9 +479,9 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, HoodieTableMetaClient.reload(metaClient));
BaseSparkCommitActionExecutor newActionExecutor = new SparkUpsertCommitActionExecutor(context, config, table,
instantTime, context.parallelize(updates));
final List<List<WriteStatus>> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator());
}).map(Transformations::flatten).collect();
final List<List<WriteStatus>> updateStatus = jsc.parallelize(Collections.singletonList(1))
.map(x -> (Iterator<List<WriteStatus>>) newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator()))
.map(Transformations::flatten).collect();
assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,15 +310,15 @@ public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector =
Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);

// Collect stats from all individual Parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap =
(Map<String, List<HoodieColumnRangeMetadata<Comparable>>>) metadata.getBlocks().stream().sequential()
.flatMap(blockMetaData ->
blockMetaData.getColumns().stream()
// Explicitly specify the type before collect since JDK11 struggles to infer it
Stream<HoodieColumnRangeMetadata<Comparable>> hoodieColumnRangeMetadataStream = metadata.getBlocks().stream().sequential()
.flatMap(blockMetaData ->
blockMetaData.getColumns().stream()
.filter(f -> cols.contains(f.getPath().toDotString()))
.map(columnChunkMetaData -> {
Statistics stats = columnChunkMetaData.getStatistics();
return HoodieColumnRangeMetadata.<Comparable>create(
// Explicitly specify the type since JDK11 struggles to infer it
return (HoodieColumnRangeMetadata<Comparable>) HoodieColumnRangeMetadata.<Comparable>create(
parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
convertToNativeJavaType(
Expand All @@ -335,8 +335,11 @@ public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
columnChunkMetaData.getTotalSize(),
columnChunkMetaData.getTotalUncompressedSize());
})
)
.collect(groupingByCollector);
);

// Collect stats from all individual Parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap =
hoodieColumnRangeMetadataStream.collect(groupingByCollector);

// Combine those into file-level statistics
// NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,26 +218,28 @@ class ColumnStats {
});

Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, HoodieColumnRangeMetadata<Comparable>>> collector =
Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), Function.identity());
Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, Function.identity());

return (Map<String, HoodieColumnRangeMetadata<Comparable>>) targetFields.stream()
// Explicitly specify the type before collect since JDK11 struggles to infer it
Stream<HoodieColumnRangeMetadata<Comparable>> hoodieColumnRangeMetadataStream = targetFields.stream()
.map(field -> {
ColumnStats colStats = allColumnStats.get(field.name());
return HoodieColumnRangeMetadata.<Comparable>create(
filePath,
field.name(),
colStats == null ? null : coerceToComparable(field.schema(), colStats.minValue),
colStats == null ? null : coerceToComparable(field.schema(), colStats.maxValue),
colStats == null ? 0 : colStats.nullCount,
colStats == null ? 0 : colStats.valueCount,
colStats == null ? 0L : colStats.nullCount,
colStats == null ? 0L : colStats.valueCount,
// NOTE: Size and compressed size statistics are set to 0 to make sure we're not
// mixing up those provided by Parquet with the ones from other encodings,
// since those are not directly comparable
0,
0
0L,
0L
);
})
.collect(collector);
});

return hoodieColumnRangeMetadataStream.collect(collector);
}

/**
Expand Down
14 changes: 0 additions & 14 deletions hudi-examples/hudi-examples-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,6 @@
</resources>

<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
14 changes: 0 additions & 14 deletions hudi-examples/hudi-examples-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,21 @@

package org.apache.hudi.functional

import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.testutils.HoodieSparkClientTestBase

import org.apache.spark.sql._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource

import java.util.concurrent.atomic.AtomicInteger
import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable

Expand Down Expand Up @@ -158,8 +155,7 @@ class TestMetadataRecordIndex extends HoodieSparkClientTestBase {
}

private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = {
val props = new Properties()
props.putAll(hudiOpts.asJava)
val props = TypedProperties.fromMap(hudiOpts.asJava)
HoodieWriteConfig.newBuilder()
.withProps(props)
.withPath(basePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

package org.apache.hudi.functional

import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.MetadataConversionUtils
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
Expand All @@ -33,16 +32,18 @@ import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, Hoodie
import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JavaConversions

import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, not}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource}

import java.util.Collections
import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectors
import java.util.{Collections, Properties}
import scala.collection.JavaConverters._
import scala.collection.{JavaConverters, mutable}
import scala.util.Using
Expand Down Expand Up @@ -577,8 +578,7 @@ class TestRecordLevelIndex extends HoodieSparkClientTestBase {
}

private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = {
val props = new Properties()
props.putAll(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava)
val props = TypedProperties.fromMap(hudiOpts.asJava)
HoodieWriteConfig.newBuilder()
.withProps(props)
.withPath(basePath)
Expand Down

0 comments on commit dbad787

Please sign in to comment.