From 0e785c56fcfc4dcfce8a56da278c5ab434b54af0 Mon Sep 17 00:00:00 2001 From: yanghao14 Date: Sun, 1 Sep 2024 23:37:58 +0800 Subject: [PATCH 1/2] [HUDI-8182] Cache internalSchema for hive read, avoid each split reload active timeline. --- ...FileBasedInternalSchemaStorageManager.java | 3 +- .../hudi/hadoop/SchemaEvolutionContext.java | 84 ++++++++++++++----- .../hive/HoodieCombineHiveInputFormat.java | 61 ++++++++++++-- .../TestHoodieCombineHiveInputFormat.java | 83 ++++++++++++++++++ 4 files changed, 203 insertions(+), 28 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java index 345bd3d35110..20ffec1d176c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java @@ -67,7 +67,8 @@ public FileBasedInternalSchemaStorageManager(HoodieStorage storage, StoragePath public FileBasedInternalSchemaStorageManager(HoodieTableMetaClient metaClient) { this.baseSchemaPath = new StoragePath(metaClient.getMetaPath(), SCHEMA_NAME); this.storage = metaClient.getStorage(); - this.metaClient = metaClient; + // the history schema files should be located into .hoodie/.schema folder. + this.metaClient = metaClient.getBasePath().getName().equalsIgnoreCase(SCHEMA_NAME) ? metaClient : null; } // make metaClient build lazy diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java index 9c00fc4bcd5f..d7ac8f30dcf3 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java @@ -20,13 +20,13 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader; import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; @@ -36,6 +36,7 @@ import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; +import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; @@ -69,6 +70,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; @@ -98,19 +100,61 @@ public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException public SchemaEvolutionContext(InputSplit split, JobConf job, Option metaClientOption) throws IOException { this.split = split; this.job = job; - this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient(); - if (this.metaClient == null) { + if (!job.getBoolean(HIVE_EVOLUTION_ENABLE, true)) { + LOG.info("schema evolution is disabled for split: {}", split); internalSchemaOption = Option.empty(); + this.metaClient = null; return; } + this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient(); + this.internalSchemaOption = getInternalSchemaFromCache(); + } + + public Option getInternalSchemaFromCache() throws IOException { + Option internalSchema = getCachedData( + HoodieCombineHiveInputFormat.INTERNAL_SCHEMA_CACHE_KEY_PREFIX, + json -> SerDeHelper.fromJson(json) + ); + if (internalSchema != null && internalSchema.isPresent()) { + return internalSchema; + } + return Option.empty(); + } + + public Schema getAvroSchemaFromCache() throws IOException { + Schema avroSchema = getCachedData( + HoodieCombineHiveInputFormat.SCHEMA_CACHE_KEY_PREFIX, + json -> new Schema.Parser().parse(json) + ); + return avroSchema; + } + + private T getCachedData(String keyPrefix, Function parser) throws IOException { + Option tablePath = getTablePath(job, split); + if (!tablePath.isPresent()) { + return null; + } + String cacheKey = keyPrefix + "." + tablePath.get().toUri(); + String cachedJson = job.get(cacheKey); + if (cachedJson == null) { + return null; + } try { - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - this.internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(); + return parser.apply(cachedJson); } catch (Exception e) { - internalSchemaOption = Option.empty(); - LOG.warn(String.format("failed to get internal Schema from hudi tableļ¼š%s", metaClient.getBasePath()), e); + LOG.warn(String.format("Failed to parse data from cache with key: %s", cacheKey), e); + return null; + } + } + + private Option getTablePath(JobConf job, InputSplit split) throws IOException { + if (split instanceof FileSplit) { + Path path = ((FileSplit) split).getPath(); + FileSystem fs = path.getFileSystem(job); + HoodieStorage storage = new HoodieHadoopStorage(fs); + return TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path)); } - LOG.info("finish init schema evolution for split: {}", split); + return Option.empty(); } private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException { @@ -122,7 +166,7 @@ private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException { return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString()) .setConf(HadoopFSUtils.getStorageConfWithCopy(job)).build(); } catch (Exception e) { - LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit)split).getPath()), e); + LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit) split).getPath()), e); return null; } } @@ -139,7 +183,7 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt return; } if (internalSchemaOption.isPresent()) { - Schema tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + Schema tableAvroSchema = getAvroSchemaFromCache(); List requiredColumns = getRequireColumn(job); InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns); @@ -171,7 +215,7 @@ public void doEvolutionForParquetFormat() { if (internalSchemaOption.isPresent()) { // reading hoodie schema evolution table job.setBoolean(HIVE_EVOLUTION_ENABLE, true); - Path finalPath = ((FileSplit)split).getPath(); + Path finalPath = ((FileSplit) split).getPath(); InternalSchema prunedSchema; List requiredColumns = getRequireColumn(job); // No need trigger schema evolution for count(*)/count(1) operation @@ -223,12 +267,12 @@ public void setColumnTypeList(JobConf job, List fields) { private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) { switch (type.typeId()) { case RECORD: - Types.RecordType record = (Types.RecordType)type; + Types.RecordType record = (Types.RecordType) type; List fields = record.fields(); - ArrayList fieldTypes = new ArrayList<>(); - ArrayList fieldNames = new ArrayList<>(); + ArrayList fieldTypes = new ArrayList<>(); + ArrayList fieldNames = new ArrayList<>(); for (int index = 0; index < fields.size(); index++) { - StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo; + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; TypeInfo subTypeInfo = getSchemaSubTypeInfo(structTypeInfo.getAllStructFieldTypeInfos().get(index), fields.get(index).type()); fieldTypes.add(subTypeInfo); String name = fields.get(index).name(); @@ -239,14 +283,14 @@ private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) { structTypeInfo.setAllStructFieldTypeInfos(fieldTypes); return structTypeInfo; case ARRAY: - ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo; + ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo; Types.ArrayType array = (Types.ArrayType) type; TypeInfo subTypeInfo = getSchemaSubTypeInfo(listTypeInfo.getListElementTypeInfo(), array.elementType()); listTypeInfo.setListElementTypeInfo(subTypeInfo); return listTypeInfo; case MAP: - Types.MapType map = (Types.MapType)type; - MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo; + Types.MapType map = (Types.MapType) type; + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; TypeInfo keyType = getSchemaSubTypeInfo(mapTypeInfo.getMapKeyTypeInfo(), map.keyType()); TypeInfo valueType = getSchemaSubTypeInfo(mapTypeInfo.getMapValueTypeInfo(), map.valueType()); MapTypeInfo mapType = new MapTypeInfo(); @@ -296,9 +340,9 @@ private void pushDownFilter(JobConf job, InternalSchema querySchema, InternalSch for (int i = 0; i < size; i++) { ExprNodeDesc expr = exprNodes.poll(); if (expr instanceof ExprNodeColumnDesc) { - String oldColumn = ((ExprNodeColumnDesc)expr).getColumn(); + String oldColumn = ((ExprNodeColumnDesc) expr).getColumn(); String newColumn = InternalSchemaUtils.reBuildFilterName(oldColumn, fileSchema, querySchema); - ((ExprNodeColumnDesc)expr).setColumn(newColumn); + ((ExprNodeColumnDesc) expr).setColumn(newColumn); } List children = expr.getChildren(); if (children != null) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index a122f59d03b8..2cdcf1212097 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -18,13 +18,24 @@ package org.apache.hudi.hadoop.hive; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.HoodieParquetInputFormatBase; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -96,7 +107,7 @@ * CombineHiveInputFormat is a parameterized InputFormat which looks at the path name and determine the correct * InputFormat for that path name from mapredPlan.pathToPartitionInfo(). It can be used to read files with different * input format in the same map-reduce job. - * + *

* NOTE : This class is implemented to work with Hive 2.x + */ public class HoodieCombineHiveInputFormat @@ -108,6 +119,8 @@ public class HoodieCombineHiveInputFormat 0) { + ArrayList uniqTablePaths = new ArrayList<>(); + Arrays.stream(paths).forEach(path -> { + HoodieStorage storage = null; + try { + storage = new HoodieHadoopStorage(path.getFileSystem(job)); + Option tablePath = TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path)); + if (tablePath.isPresent()) { + uniqTablePaths.add(tablePath.get().toUri().toString()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + try { + for (String path : uniqTablePaths) { + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(new HadoopStorageConfiguration(job)).build(); + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + String avroSchema = schemaUtil.getTableAvroSchema().toString(); + Option internalSchema = schemaUtil.getTableInternalSchemaFromCommitMetadata(); + if (internalSchema.isPresent()) { + LOG.info("Set internal internalSchema and avro schema of path: " + path.toString()); + job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema); + job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, SerDeHelper.toJson(internalSchema.get())); + } + } + } catch (Exception e) { + LOG.warn("Fail to set internal schema", e); + } + } + LOG.info("Number of all splits " + result.size()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return result.toArray(new InputSplit[result.size()]); @@ -390,6 +436,7 @@ private void processPaths(JobConf job, CombineFileInputFormatShim combine, List< /** * HiveFileFormatUtils.getPartitionDescFromPathRecursively is no longer available since Hive 3. * This method is to make it compatible with both Hive 2 and Hive 3. + * * @param pathToPartitionInfo * @param dir * @param cacheMap @@ -397,7 +444,7 @@ private void processPaths(JobConf job, CombineFileInputFormatShim combine, List< * @throws IOException */ private static PartitionDesc getPartitionFromPath(Map pathToPartitionInfo, Path dir, - Map, Map> cacheMap) + Map, Map> cacheMap) throws IOException { Method method; try { @@ -594,7 +641,7 @@ public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) throw } public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim, - Map pathToPartitionInfo) throws IOException { + Map pathToPartitionInfo) throws IOException { this.inputSplitShim = inputSplitShim; this.pathToPartitionInfo = pathToPartitionInfo; if (job != null) { @@ -766,7 +813,7 @@ private static class CombinePathInputFormat { private final String deserializerClassName; public CombinePathInputFormat(List> opList, String inputFormatClassName, - String deserializerClassName) { + String deserializerClassName) { this.opList = opList; this.inputFormatClassName = inputFormatClassName; this.deserializerClassName = deserializerClassName; @@ -929,13 +976,13 @@ public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOExcepti int counter = 0; for (int pos = 0; pos < splits.length; pos++) { if (counter == maxSize - 1 || pos == splits.length - 1) { - builder.addSplit((FileSplit)splits[pos]); + builder.addSplit((FileSplit) splits[pos]); combineFileSplits.add(builder.build(job)); builder = new HoodieCombineRealtimeFileSplit.Builder(); counter = 0; } else if (counter < maxSize) { counter++; - builder.addSplit((FileSplit)splits[pos]); + builder.addSplit((FileSplit) splits[pos]); } } return combineFileSplits.toArray(new CombineFileSplit[combineFileSplits.size()]); @@ -962,7 +1009,7 @@ public HadoopShimsSecure.InputSplitShim getInputSplitShim() { @Override public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter, - Class> rrClass) throws IOException { + Class> rrClass) throws IOException { isRealTime = Boolean.valueOf(job.get("hudi.hive.realtime", "false")); if (isRealTime) { List recordReaders = new LinkedList<>(); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java index ab907390f884..e1fd5fc68743 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java @@ -31,8 +31,12 @@ import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.SchemaEvolutionContext; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; @@ -111,6 +115,85 @@ public void tearDown() throws IOException { } } + @Test + public void testInternalSchemaCacheForMR() throws Exception { + // test for HUDI-8182 + StorageConfiguration conf = HoodieTestUtils.getDefaultStorageConf(); + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + java.nio.file.Path path1 = tempDir.resolve("tblOne"); + java.nio.file.Path path2 = tempDir.resolve("tblTwo"); + HoodieTestUtils.init(conf, path1.toString(), HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(conf, path2.toString(), HoodieTableType.MERGE_ON_READ); + String commitTime = "100"; + final int numRecords = 10; + // Create 3 parquet files with 10 records each for partition 1 + File partitionDirOne = InputFormatTestUtil.prepareParquetTable(path1, schema, 3, numRecords, commitTime); + HoodieCommitMetadata commitMetadataOne = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + // mock the latest schema to the commit metadata + InternalSchema internalSchema = AvroInternalSchemaConverter.convert(schema); + commitMetadataOne.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(internalSchema)); + FileCreateUtils.createCommit(path1.toString(), commitTime, Option.of(commitMetadataOne)); + // Create 3 parquet files with 10 records each for partition 2 + File partitionDirTwo = InputFormatTestUtil.prepareParquetTable(path2, schema, 3, numRecords, commitTime); + HoodieCommitMetadata commitMetadataTwo = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + // Mock the latest schema to the commit metadata + commitMetadataTwo.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(internalSchema)); + FileCreateUtils.createCommit(path2.toString(), commitTime, Option.of(commitMetadataTwo)); + + // Enable schema evolution + conf.set("hoodie.schema.on.read.enable", "true"); + + TableDesc tblDesc = Utilities.defaultTd; + // Set the input format + tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class); + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap pt = new LinkedHashMap<>(); + LinkedHashMap> tableAlias = new LinkedHashMap<>(); + ArrayList alias = new ArrayList<>(); + // Add partition info one + alias.add(path1.toAbsolutePath().toString()); + tableAlias.put(new Path(path1.toAbsolutePath().toString()), alias); + pt.put(new Path(path1.toAbsolutePath().toString()), partDesc); + // Add partition info two + alias.add(path2.toAbsolutePath().toString()); + tableAlias.put(new Path(path2.toAbsolutePath().toString()), alias); + pt.put(new Path(path2.toAbsolutePath().toString()), partDesc); + + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + mrwork.getMapWork().setPathToAliases(tableAlias); + mrwork.getMapWork().setMapperCannotSpanPartns(true); + Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); + Utilities.setMapRedWork(conf.unwrap(), mrwork, mapWorkPath); + JobConf jobConf = new JobConf(conf.unwrap()); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDirOne.getPath() + "," + partitionDirTwo.getPath()); + jobConf.set(HAS_MAP_WORK, "true"); + // The following config tells Hive to choose ExecMapper to read the MAP_WORK + jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); + // set SPLIT_MAXSIZE larger to create one split for 3 files groups + jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000"); + + HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat(); + String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double"; + InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema, tripsHiveColumnTypes); + InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 2); + // Check the internal schema and avro is the same as the original one + for (InputSplit split : splits) { + HoodieCombineRealtimeFileSplit inputSplitShim = (HoodieCombineRealtimeFileSplit) ((HoodieCombineRealtimeHiveSplit) split).getInputSplitShim(); + List fileSplits = inputSplitShim.getRealtimeFileSplits(); + for (FileSplit fileSplit : fileSplits) { + SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(fileSplit, jobConf); + Option internalSchemaFromCache = schemaEvolutionContext.getInternalSchemaFromCache(); + assertEquals(internalSchemaFromCache.get(), internalSchema); + Schema avroSchemaFromCache = schemaEvolutionContext.getAvroSchemaFromCache(); + assertEquals(avroSchemaFromCache, schema); + } + } + } + @Test public void multiPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception { // test for HUDI-1718 From 2d848295b63cfddbe116034438ab4797fd98e465 Mon Sep 17 00:00:00 2001 From: danny0405 Date: Sun, 22 Sep 2024 11:09:09 +0800 Subject: [PATCH 2/2] fix test failures --- ...FileBasedInternalSchemaStorageManager.java | 3 +- .../hudi/hadoop/SchemaEvolutionContext.java | 51 ++++++++++++------- .../hive/HoodieCombineHiveInputFormat.java | 12 +++-- 3 files changed, 42 insertions(+), 24 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java index 20ffec1d176c..345bd3d35110 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java @@ -67,8 +67,7 @@ public FileBasedInternalSchemaStorageManager(HoodieStorage storage, StoragePath public FileBasedInternalSchemaStorageManager(HoodieTableMetaClient metaClient) { this.baseSchemaPath = new StoragePath(metaClient.getMetaPath(), SCHEMA_NAME); this.storage = metaClient.getStorage(); - // the history schema files should be located into .hoodie/.schema folder. - this.metaClient = metaClient.getBasePath().getName().equalsIgnoreCase(SCHEMA_NAME) ? metaClient : null; + this.metaClient = metaClient; } // make metaClient build lazy diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java index d7ac8f30dcf3..a0e2054fb3ff 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java @@ -20,11 +20,13 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader; @@ -64,6 +66,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -101,7 +105,7 @@ public SchemaEvolutionContext(InputSplit split, JobConf job, Option getInternalSchemaFromCache() throws IOException { - Option internalSchema = getCachedData( + Option internalSchemaOpt = getCachedData( HoodieCombineHiveInputFormat.INTERNAL_SCHEMA_CACHE_KEY_PREFIX, - json -> SerDeHelper.fromJson(json) - ); - if (internalSchema != null && internalSchema.isPresent()) { - return internalSchema; + SerDeHelper::fromJson); + if (internalSchemaOpt == null) { + // the code path should only be invoked in tests. + return new TableSchemaResolver(this.metaClient).getTableInternalSchemaFromCommitMetadata(); } - return Option.empty(); + return internalSchemaOpt; } - public Schema getAvroSchemaFromCache() throws IOException { - Schema avroSchema = getCachedData( + public Schema getAvroSchemaFromCache() throws Exception { + Option avroSchemaOpt = getCachedData( HoodieCombineHiveInputFormat.SCHEMA_CACHE_KEY_PREFIX, - json -> new Schema.Parser().parse(json) - ); - return avroSchema; + json -> Option.ofNullable(new Schema.Parser().parse(json))); + if (avroSchemaOpt == null) { + // the code path should only be invoked in tests. + return new TableSchemaResolver(this.metaClient).getTableAvroSchema(); + } + return avroSchemaOpt.orElseThrow(() -> new HoodieValidationException("The avro schema cache should always be set up together with the internal schema cache")); } - private T getCachedData(String keyPrefix, Function parser) throws IOException { + /** + * Returns the cache data with given key or null if the cache was never set up. + */ + @Nullable + private Option getCachedData(String keyPrefix, Function> parser) throws IOException { Option tablePath = getTablePath(job, split); if (!tablePath.isPresent()) { - return null; + return Option.empty(); } String cacheKey = keyPrefix + "." + tablePath.get().toUri(); String cachedJson = job.get(cacheKey); if (cachedJson == null) { + // the code path should only be invoked in tests. return null; } + if (cachedJson.isEmpty()) { + return Option.empty(); + } try { return parser.apply(cachedJson); } catch (Exception e) { - LOG.warn(String.format("Failed to parse data from cache with key: %s", cacheKey), e); - return null; + LOG.warn("Failed to parse data from cache with key: {}", cacheKey, e); + return Option.empty(); } } @@ -157,7 +172,7 @@ private Option getTablePath(JobConf job, InputSplit split) throws I return Option.empty(); } - private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException { + private HoodieTableMetaClient setUpHoodieTableMetaClient() { try { Path inputPath = ((FileSplit) split).getPath(); FileSystem fs = inputPath.getFileSystem(job); @@ -189,7 +204,7 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt requiredColumns); // Add partitioning fields to writer schema for resulting row to contain null values for these fields String partitionFields = job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); - List partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) + List partitioningFields = !partitionFields.isEmpty() ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) : new ArrayList<>(); Schema writerSchema = AvroInternalSchemaConverter.convert(internalSchemaOption.get(), tableAvroSchema.getName()); writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index 2cdcf1212097..8387281cebb4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -389,10 +389,10 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { Utilities.clearWorkMapForConf(job); // build internal schema for the query - if (result.size() > 0) { + if (!result.isEmpty()) { ArrayList uniqTablePaths = new ArrayList<>(); Arrays.stream(paths).forEach(path -> { - HoodieStorage storage = null; + final HoodieStorage storage; try { storage = new HoodieHadoopStorage(path.getFileSystem(job)); Option tablePath = TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path)); @@ -411,13 +411,17 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { String avroSchema = schemaUtil.getTableAvroSchema().toString(); Option internalSchema = schemaUtil.getTableInternalSchemaFromCommitMetadata(); if (internalSchema.isPresent()) { - LOG.info("Set internal internalSchema and avro schema of path: " + path.toString()); + LOG.info("Set internal and avro schema cache with path: " + path); job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema); job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, SerDeHelper.toJson(internalSchema.get())); + } else { + // always sets up the cache so that we can distinguish with the scenario where the cache was never set(e.g. in tests). + job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, ""); + job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, ""); } } } catch (Exception e) { - LOG.warn("Fail to set internal schema", e); + LOG.warn("Fail to set schema cache", e); } }