From 77eb9e58287b0cb1ad142364489eb29989e3bce4 Mon Sep 17 00:00:00 2001 From: muyihao <37872457+muyihao@users.noreply.github.com> Date: Sun, 22 Sep 2024 20:54:49 +0800 Subject: [PATCH] [HUDI-8182] Cache internalSchema for hive read, each split reload active timeline (#11914) * [HUDI-8182] Cache internalSchema for hive read, avoid each split reload active timeline. * fix test failures --------- Co-authored-by: yanghao14 Co-authored-by: danny0405 --- .../hudi/hadoop/SchemaEvolutionContext.java | 101 ++++++++++++++---- .../hive/HoodieCombineHiveInputFormat.java | 65 +++++++++-- .../TestHoodieCombineHiveInputFormat.java | 83 ++++++++++++++ 3 files changed, 221 insertions(+), 28 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java index 9c00fc4bcd5f..a0e2054fb3ff 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java @@ -26,7 +26,9 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader; import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; @@ -36,6 +38,7 @@ import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; +import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; @@ -63,12 +66,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; @@ -98,22 +104,75 @@ public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException public SchemaEvolutionContext(InputSplit split, JobConf job, Option metaClientOption) throws IOException { this.split = split; this.job = job; - this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient(); - if (this.metaClient == null) { + if (!job.getBoolean(HIVE_EVOLUTION_ENABLE, true)) { + LOG.info("Schema evolution is disabled for split: {}", split); internalSchemaOption = Option.empty(); + this.metaClient = null; return; } + this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient(); + this.internalSchemaOption = getInternalSchemaFromCache(); + } + + public Option getInternalSchemaFromCache() throws IOException { + Option internalSchemaOpt = getCachedData( + HoodieCombineHiveInputFormat.INTERNAL_SCHEMA_CACHE_KEY_PREFIX, + SerDeHelper::fromJson); + if (internalSchemaOpt == null) { + // the code path should only be invoked in tests. + return new TableSchemaResolver(this.metaClient).getTableInternalSchemaFromCommitMetadata(); + } + return internalSchemaOpt; + } + + public Schema getAvroSchemaFromCache() throws Exception { + Option avroSchemaOpt = getCachedData( + HoodieCombineHiveInputFormat.SCHEMA_CACHE_KEY_PREFIX, + json -> Option.ofNullable(new Schema.Parser().parse(json))); + if (avroSchemaOpt == null) { + // the code path should only be invoked in tests. + return new TableSchemaResolver(this.metaClient).getTableAvroSchema(); + } + return avroSchemaOpt.orElseThrow(() -> new HoodieValidationException("The avro schema cache should always be set up together with the internal schema cache")); + } + + /** + * Returns the cache data with given key or null if the cache was never set up. + */ + @Nullable + private Option getCachedData(String keyPrefix, Function> parser) throws IOException { + Option tablePath = getTablePath(job, split); + if (!tablePath.isPresent()) { + return Option.empty(); + } + String cacheKey = keyPrefix + "." + tablePath.get().toUri(); + String cachedJson = job.get(cacheKey); + if (cachedJson == null) { + // the code path should only be invoked in tests. + return null; + } + if (cachedJson.isEmpty()) { + return Option.empty(); + } try { - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - this.internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(); + return parser.apply(cachedJson); } catch (Exception e) { - internalSchemaOption = Option.empty(); - LOG.warn(String.format("failed to get internal Schema from hudi tableļ¼š%s", metaClient.getBasePath()), e); + LOG.warn("Failed to parse data from cache with key: {}", cacheKey, e); + return Option.empty(); + } + } + + private Option getTablePath(JobConf job, InputSplit split) throws IOException { + if (split instanceof FileSplit) { + Path path = ((FileSplit) split).getPath(); + FileSystem fs = path.getFileSystem(job); + HoodieStorage storage = new HoodieHadoopStorage(fs); + return TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path)); } - LOG.info("finish init schema evolution for split: {}", split); + return Option.empty(); } - private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException { + private HoodieTableMetaClient setUpHoodieTableMetaClient() { try { Path inputPath = ((FileSplit) split).getPath(); FileSystem fs = inputPath.getFileSystem(job); @@ -122,7 +181,7 @@ private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException { return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString()) .setConf(HadoopFSUtils.getStorageConfWithCopy(job)).build(); } catch (Exception e) { - LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit)split).getPath()), e); + LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit) split).getPath()), e); return null; } } @@ -139,13 +198,13 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt return; } if (internalSchemaOption.isPresent()) { - Schema tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + Schema tableAvroSchema = getAvroSchemaFromCache(); List requiredColumns = getRequireColumn(job); InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns); // Add partitioning fields to writer schema for resulting row to contain null values for these fields String partitionFields = job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); - List partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) + List partitioningFields = !partitionFields.isEmpty() ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) : new ArrayList<>(); Schema writerSchema = AvroInternalSchemaConverter.convert(internalSchemaOption.get(), tableAvroSchema.getName()); writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields); @@ -171,7 +230,7 @@ public void doEvolutionForParquetFormat() { if (internalSchemaOption.isPresent()) { // reading hoodie schema evolution table job.setBoolean(HIVE_EVOLUTION_ENABLE, true); - Path finalPath = ((FileSplit)split).getPath(); + Path finalPath = ((FileSplit) split).getPath(); InternalSchema prunedSchema; List requiredColumns = getRequireColumn(job); // No need trigger schema evolution for count(*)/count(1) operation @@ -223,12 +282,12 @@ public void setColumnTypeList(JobConf job, List fields) { private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) { switch (type.typeId()) { case RECORD: - Types.RecordType record = (Types.RecordType)type; + Types.RecordType record = (Types.RecordType) type; List fields = record.fields(); - ArrayList fieldTypes = new ArrayList<>(); - ArrayList fieldNames = new ArrayList<>(); + ArrayList fieldTypes = new ArrayList<>(); + ArrayList fieldNames = new ArrayList<>(); for (int index = 0; index < fields.size(); index++) { - StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo; + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; TypeInfo subTypeInfo = getSchemaSubTypeInfo(structTypeInfo.getAllStructFieldTypeInfos().get(index), fields.get(index).type()); fieldTypes.add(subTypeInfo); String name = fields.get(index).name(); @@ -239,14 +298,14 @@ private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) { structTypeInfo.setAllStructFieldTypeInfos(fieldTypes); return structTypeInfo; case ARRAY: - ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo; + ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo; Types.ArrayType array = (Types.ArrayType) type; TypeInfo subTypeInfo = getSchemaSubTypeInfo(listTypeInfo.getListElementTypeInfo(), array.elementType()); listTypeInfo.setListElementTypeInfo(subTypeInfo); return listTypeInfo; case MAP: - Types.MapType map = (Types.MapType)type; - MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo; + Types.MapType map = (Types.MapType) type; + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; TypeInfo keyType = getSchemaSubTypeInfo(mapTypeInfo.getMapKeyTypeInfo(), map.keyType()); TypeInfo valueType = getSchemaSubTypeInfo(mapTypeInfo.getMapValueTypeInfo(), map.valueType()); MapTypeInfo mapType = new MapTypeInfo(); @@ -296,9 +355,9 @@ private void pushDownFilter(JobConf job, InternalSchema querySchema, InternalSch for (int i = 0; i < size; i++) { ExprNodeDesc expr = exprNodes.poll(); if (expr instanceof ExprNodeColumnDesc) { - String oldColumn = ((ExprNodeColumnDesc)expr).getColumn(); + String oldColumn = ((ExprNodeColumnDesc) expr).getColumn(); String newColumn = InternalSchemaUtils.reBuildFilterName(oldColumn, fileSchema, querySchema); - ((ExprNodeColumnDesc)expr).setColumn(newColumn); + ((ExprNodeColumnDesc) expr).setColumn(newColumn); } List children = expr.getChildren(); if (children != null) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index c5ab69cde9e1..99c9ecd210e4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -18,13 +18,24 @@ package org.apache.hudi.hadoop.hive; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.HoodieParquetInputFormatBase; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -96,7 +107,7 @@ * CombineHiveInputFormat is a parameterized InputFormat which looks at the path name and determine the correct * InputFormat for that path name from mapredPlan.pathToPartitionInfo(). It can be used to read files with different * input format in the same map-reduce job. - * + *

* NOTE : This class is implemented to work with Hive 2.x + */ public class HoodieCombineHiveInputFormat @@ -108,6 +119,8 @@ public class HoodieCombineHiveInputFormat uniqTablePaths = new ArrayList<>(); + Arrays.stream(paths).forEach(path -> { + final HoodieStorage storage; + try { + storage = new HoodieHadoopStorage(path.getFileSystem(job)); + Option tablePath = TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path)); + if (tablePath.isPresent()) { + uniqTablePaths.add(tablePath.get().toUri().toString()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + try { + for (String path : uniqTablePaths) { + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(new HadoopStorageConfiguration(job)).build(); + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + String avroSchema = schemaUtil.getTableAvroSchema().toString(); + Option internalSchema = schemaUtil.getTableInternalSchemaFromCommitMetadata(); + if (internalSchema.isPresent()) { + LOG.info("Set internal and avro schema cache with path: " + path); + job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema); + job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, SerDeHelper.toJson(internalSchema.get())); + } else { + // always sets up the cache so that we can distinguish with the scenario where the cache was never set(e.g. in tests). + job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, ""); + job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, ""); + } + } + } catch (Exception e) { + LOG.warn("Fail to set schema cache", e); + } + } + LOG.info("Number of all splits " + result.size()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return result.toArray(new InputSplit[result.size()]); @@ -390,6 +440,7 @@ private void processPaths(JobConf job, CombineFileInputFormatShim combine, List< /** * HiveFileFormatUtils.getPartitionDescFromPathRecursively is no longer available since Hive 3. * This method is to make it compatible with both Hive 2 and Hive 3. + * * @param pathToPartitionInfo * @param dir * @param cacheMap @@ -397,7 +448,7 @@ private void processPaths(JobConf job, CombineFileInputFormatShim combine, List< * @throws IOException */ private static PartitionDesc getPartitionFromPath(Map pathToPartitionInfo, Path dir, - Map, Map> cacheMap) + Map, Map> cacheMap) throws IOException { Method method; try { @@ -595,7 +646,7 @@ public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) throw } public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim, - Map pathToPartitionInfo) throws IOException { + Map pathToPartitionInfo) throws IOException { this.inputSplitShim = inputSplitShim; this.pathToPartitionInfo = pathToPartitionInfo; if (job != null) { @@ -767,7 +818,7 @@ private static class CombinePathInputFormat { private final String deserializerClassName; public CombinePathInputFormat(List> opList, String inputFormatClassName, - String deserializerClassName) { + String deserializerClassName) { this.opList = opList; this.inputFormatClassName = inputFormatClassName; this.deserializerClassName = deserializerClassName; @@ -930,13 +981,13 @@ public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOExcepti int counter = 0; for (int pos = 0; pos < splits.length; pos++) { if (counter == maxSize - 1 || pos == splits.length - 1) { - builder.addSplit((FileSplit)splits[pos]); + builder.addSplit((FileSplit) splits[pos]); combineFileSplits.add(builder.build(job)); builder = new HoodieCombineRealtimeFileSplit.Builder(); counter = 0; } else if (counter < maxSize) { counter++; - builder.addSplit((FileSplit)splits[pos]); + builder.addSplit((FileSplit) splits[pos]); } } return combineFileSplits.toArray(new CombineFileSplit[combineFileSplits.size()]); @@ -963,7 +1014,7 @@ public HadoopShimsSecure.InputSplitShim getInputSplitShim() { @Override public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter, - Class> rrClass) throws IOException { + Class> rrClass) throws IOException { isRealTime = Boolean.valueOf(job.get("hudi.hive.realtime", "false")); if (isRealTime) { List recordReaders = new LinkedList<>(); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java index ab907390f884..e1fd5fc68743 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java @@ -31,8 +31,12 @@ import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.SchemaEvolutionContext; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; @@ -111,6 +115,85 @@ public void tearDown() throws IOException { } } + @Test + public void testInternalSchemaCacheForMR() throws Exception { + // test for HUDI-8182 + StorageConfiguration conf = HoodieTestUtils.getDefaultStorageConf(); + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + java.nio.file.Path path1 = tempDir.resolve("tblOne"); + java.nio.file.Path path2 = tempDir.resolve("tblTwo"); + HoodieTestUtils.init(conf, path1.toString(), HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(conf, path2.toString(), HoodieTableType.MERGE_ON_READ); + String commitTime = "100"; + final int numRecords = 10; + // Create 3 parquet files with 10 records each for partition 1 + File partitionDirOne = InputFormatTestUtil.prepareParquetTable(path1, schema, 3, numRecords, commitTime); + HoodieCommitMetadata commitMetadataOne = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + // mock the latest schema to the commit metadata + InternalSchema internalSchema = AvroInternalSchemaConverter.convert(schema); + commitMetadataOne.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(internalSchema)); + FileCreateUtils.createCommit(path1.toString(), commitTime, Option.of(commitMetadataOne)); + // Create 3 parquet files with 10 records each for partition 2 + File partitionDirTwo = InputFormatTestUtil.prepareParquetTable(path2, schema, 3, numRecords, commitTime); + HoodieCommitMetadata commitMetadataTwo = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + // Mock the latest schema to the commit metadata + commitMetadataTwo.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(internalSchema)); + FileCreateUtils.createCommit(path2.toString(), commitTime, Option.of(commitMetadataTwo)); + + // Enable schema evolution + conf.set("hoodie.schema.on.read.enable", "true"); + + TableDesc tblDesc = Utilities.defaultTd; + // Set the input format + tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class); + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap pt = new LinkedHashMap<>(); + LinkedHashMap> tableAlias = new LinkedHashMap<>(); + ArrayList alias = new ArrayList<>(); + // Add partition info one + alias.add(path1.toAbsolutePath().toString()); + tableAlias.put(new Path(path1.toAbsolutePath().toString()), alias); + pt.put(new Path(path1.toAbsolutePath().toString()), partDesc); + // Add partition info two + alias.add(path2.toAbsolutePath().toString()); + tableAlias.put(new Path(path2.toAbsolutePath().toString()), alias); + pt.put(new Path(path2.toAbsolutePath().toString()), partDesc); + + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + mrwork.getMapWork().setPathToAliases(tableAlias); + mrwork.getMapWork().setMapperCannotSpanPartns(true); + Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); + Utilities.setMapRedWork(conf.unwrap(), mrwork, mapWorkPath); + JobConf jobConf = new JobConf(conf.unwrap()); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDirOne.getPath() + "," + partitionDirTwo.getPath()); + jobConf.set(HAS_MAP_WORK, "true"); + // The following config tells Hive to choose ExecMapper to read the MAP_WORK + jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); + // set SPLIT_MAXSIZE larger to create one split for 3 files groups + jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000"); + + HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat(); + String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double"; + InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema, tripsHiveColumnTypes); + InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 2); + // Check the internal schema and avro is the same as the original one + for (InputSplit split : splits) { + HoodieCombineRealtimeFileSplit inputSplitShim = (HoodieCombineRealtimeFileSplit) ((HoodieCombineRealtimeHiveSplit) split).getInputSplitShim(); + List fileSplits = inputSplitShim.getRealtimeFileSplits(); + for (FileSplit fileSplit : fileSplits) { + SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(fileSplit, jobConf); + Option internalSchemaFromCache = schemaEvolutionContext.getInternalSchemaFromCache(); + assertEquals(internalSchemaFromCache.get(), internalSchema); + Schema avroSchemaFromCache = schemaEvolutionContext.getAvroSchemaFromCache(); + assertEquals(avroSchemaFromCache, schema); + } + } + } + @Test public void multiPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception { // test for HUDI-1718