Skip to content

Commit

Permalink
[HUDI-8182] Cache internalSchema for hive read, each split reload act…
Browse files Browse the repository at this point in the history
…ive timeline (#11914)

* [HUDI-8182] Cache internalSchema for hive read, avoid each split reload active timeline.

* fix test failures

---------

Co-authored-by: yanghao14 <[email protected]>
Co-authored-by: danny0405 <[email protected]>
  • Loading branch information
3 people committed Sep 22, 2024
1 parent 9c54838 commit 77eb9e5
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
Expand All @@ -36,6 +38,7 @@
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
Expand Down Expand Up @@ -63,12 +66,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
Expand Down Expand Up @@ -98,22 +104,75 @@ public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException
public SchemaEvolutionContext(InputSplit split, JobConf job, Option<HoodieTableMetaClient> metaClientOption) throws IOException {
this.split = split;
this.job = job;
this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient();
if (this.metaClient == null) {
if (!job.getBoolean(HIVE_EVOLUTION_ENABLE, true)) {
LOG.info("Schema evolution is disabled for split: {}", split);
internalSchemaOption = Option.empty();
this.metaClient = null;
return;
}
this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient();
this.internalSchemaOption = getInternalSchemaFromCache();
}

public Option<InternalSchema> getInternalSchemaFromCache() throws IOException {
Option<InternalSchema> internalSchemaOpt = getCachedData(
HoodieCombineHiveInputFormat.INTERNAL_SCHEMA_CACHE_KEY_PREFIX,
SerDeHelper::fromJson);
if (internalSchemaOpt == null) {
// the code path should only be invoked in tests.
return new TableSchemaResolver(this.metaClient).getTableInternalSchemaFromCommitMetadata();
}
return internalSchemaOpt;
}

public Schema getAvroSchemaFromCache() throws Exception {
Option<Schema> avroSchemaOpt = getCachedData(
HoodieCombineHiveInputFormat.SCHEMA_CACHE_KEY_PREFIX,
json -> Option.ofNullable(new Schema.Parser().parse(json)));
if (avroSchemaOpt == null) {
// the code path should only be invoked in tests.
return new TableSchemaResolver(this.metaClient).getTableAvroSchema();
}
return avroSchemaOpt.orElseThrow(() -> new HoodieValidationException("The avro schema cache should always be set up together with the internal schema cache"));
}

/**
* Returns the cache data with given key or null if the cache was never set up.
*/
@Nullable
private <T> Option<T> getCachedData(String keyPrefix, Function<String, Option<T>> parser) throws IOException {
Option<StoragePath> tablePath = getTablePath(job, split);
if (!tablePath.isPresent()) {
return Option.empty();
}
String cacheKey = keyPrefix + "." + tablePath.get().toUri();
String cachedJson = job.get(cacheKey);
if (cachedJson == null) {
// the code path should only be invoked in tests.
return null;
}
if (cachedJson.isEmpty()) {
return Option.empty();
}
try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
this.internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata();
return parser.apply(cachedJson);
} catch (Exception e) {
internalSchemaOption = Option.empty();
LOG.warn(String.format("failed to get internal Schema from hudi table:%s", metaClient.getBasePath()), e);
LOG.warn("Failed to parse data from cache with key: {}", cacheKey, e);
return Option.empty();
}
}

private Option<StoragePath> getTablePath(JobConf job, InputSplit split) throws IOException {
if (split instanceof FileSplit) {
Path path = ((FileSplit) split).getPath();
FileSystem fs = path.getFileSystem(job);
HoodieStorage storage = new HoodieHadoopStorage(fs);
return TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
}
LOG.info("finish init schema evolution for split: {}", split);
return Option.empty();
}

private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException {
private HoodieTableMetaClient setUpHoodieTableMetaClient() {
try {
Path inputPath = ((FileSplit) split).getPath();
FileSystem fs = inputPath.getFileSystem(job);
Expand All @@ -122,7 +181,7 @@ private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException {
return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString())
.setConf(HadoopFSUtils.getStorageConfWithCopy(job)).build();
} catch (Exception e) {
LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit)split).getPath()), e);
LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit) split).getPath()), e);
return null;
}
}
Expand All @@ -139,13 +198,13 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
return;
}
if (internalSchemaOption.isPresent()) {
Schema tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
Schema tableAvroSchema = getAvroSchemaFromCache();
List<String> requiredColumns = getRequireColumn(job);
InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(),
requiredColumns);
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
List<String> partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
List<String> partitioningFields = !partitionFields.isEmpty() ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
: new ArrayList<>();
Schema writerSchema = AvroInternalSchemaConverter.convert(internalSchemaOption.get(), tableAvroSchema.getName());
writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields);
Expand All @@ -171,7 +230,7 @@ public void doEvolutionForParquetFormat() {
if (internalSchemaOption.isPresent()) {
// reading hoodie schema evolution table
job.setBoolean(HIVE_EVOLUTION_ENABLE, true);
Path finalPath = ((FileSplit)split).getPath();
Path finalPath = ((FileSplit) split).getPath();
InternalSchema prunedSchema;
List<String> requiredColumns = getRequireColumn(job);
// No need trigger schema evolution for count(*)/count(1) operation
Expand Down Expand Up @@ -223,12 +282,12 @@ public void setColumnTypeList(JobConf job, List<Types.Field> fields) {
private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) {
switch (type.typeId()) {
case RECORD:
Types.RecordType record = (Types.RecordType)type;
Types.RecordType record = (Types.RecordType) type;
List<Types.Field> fields = record.fields();
ArrayList<TypeInfo> fieldTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
ArrayList<TypeInfo> fieldTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
for (int index = 0; index < fields.size(); index++) {
StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo;
StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
TypeInfo subTypeInfo = getSchemaSubTypeInfo(structTypeInfo.getAllStructFieldTypeInfos().get(index), fields.get(index).type());
fieldTypes.add(subTypeInfo);
String name = fields.get(index).name();
Expand All @@ -239,14 +298,14 @@ private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) {
structTypeInfo.setAllStructFieldTypeInfos(fieldTypes);
return structTypeInfo;
case ARRAY:
ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo;
ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
Types.ArrayType array = (Types.ArrayType) type;
TypeInfo subTypeInfo = getSchemaSubTypeInfo(listTypeInfo.getListElementTypeInfo(), array.elementType());
listTypeInfo.setListElementTypeInfo(subTypeInfo);
return listTypeInfo;
case MAP:
Types.MapType map = (Types.MapType)type;
MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo;
Types.MapType map = (Types.MapType) type;
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
TypeInfo keyType = getSchemaSubTypeInfo(mapTypeInfo.getMapKeyTypeInfo(), map.keyType());
TypeInfo valueType = getSchemaSubTypeInfo(mapTypeInfo.getMapValueTypeInfo(), map.valueType());
MapTypeInfo mapType = new MapTypeInfo();
Expand Down Expand Up @@ -296,9 +355,9 @@ private void pushDownFilter(JobConf job, InternalSchema querySchema, InternalSch
for (int i = 0; i < size; i++) {
ExprNodeDesc expr = exprNodes.poll();
if (expr instanceof ExprNodeColumnDesc) {
String oldColumn = ((ExprNodeColumnDesc)expr).getColumn();
String oldColumn = ((ExprNodeColumnDesc) expr).getColumn();
String newColumn = InternalSchemaUtils.reBuildFilterName(oldColumn, fileSchema, querySchema);
((ExprNodeColumnDesc)expr).setColumn(newColumn);
((ExprNodeColumnDesc) expr).setColumn(newColumn);
}
List<ExprNodeDesc> children = expr.getChildren();
if (children != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,24 @@

package org.apache.hudi.hadoop.hive;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormatBase;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -96,7 +107,7 @@
* CombineHiveInputFormat is a parameterized InputFormat which looks at the path name and determine the correct
* InputFormat for that path name from mapredPlan.pathToPartitionInfo(). It can be used to read files with different
* input format in the same map-reduce job.
*
* <p>
* NOTE : This class is implemented to work with Hive 2.x +
*/
public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extends Writable>
Expand All @@ -108,6 +119,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
// max number of threads we can use to check non-combinable paths
private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
private static final int DEFAULT_NUM_PATH_PER_THREAD = 100;
public static final String INTERNAL_SCHEMA_CACHE_KEY_PREFIX = "hudi.hive.internal.schema.cache.key.prefix";
public static final String SCHEMA_CACHE_KEY_PREFIX = "hudi.hive.schema.cache.key.prefix";

protected String getParquetInputFormatClassName() {
return HoodieParquetInputFormat.class.getName();
Expand Down Expand Up @@ -375,6 +388,43 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
// clear work from ThreadLocal after splits generated in case of thread is reused in pool.
Utilities.clearWorkMapForConf(job);

// build internal schema for the query
if (!result.isEmpty()) {
ArrayList<String> uniqTablePaths = new ArrayList<>();
Arrays.stream(paths).forEach(path -> {
final HoodieStorage storage;
try {
storage = new HoodieHadoopStorage(path.getFileSystem(job));
Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
if (tablePath.isPresent()) {
uniqTablePaths.add(tablePath.get().toUri().toString());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});

try {
for (String path : uniqTablePaths) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(new HadoopStorageConfiguration(job)).build();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String avroSchema = schemaUtil.getTableAvroSchema().toString();
Option<InternalSchema> internalSchema = schemaUtil.getTableInternalSchemaFromCommitMetadata();
if (internalSchema.isPresent()) {
LOG.info("Set internal and avro schema cache with path: " + path);
job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema);
job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, SerDeHelper.toJson(internalSchema.get()));
} else {
// always sets up the cache so that we can distinguish with the scenario where the cache was never set(e.g. in tests).
job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
}
}
} catch (Exception e) {
LOG.warn("Fail to set schema cache", e);
}
}

LOG.info("Number of all splits " + result.size());
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return result.toArray(new InputSplit[result.size()]);
Expand All @@ -390,14 +440,15 @@ private void processPaths(JobConf job, CombineFileInputFormatShim combine, List<
/**
* HiveFileFormatUtils.getPartitionDescFromPathRecursively is no longer available since Hive 3.
* This method is to make it compatible with both Hive 2 and Hive 3.
*
* @param pathToPartitionInfo
* @param dir
* @param cacheMap
* @return
* @throws IOException
*/
private static PartitionDesc getPartitionFromPath(Map<Path, PartitionDesc> pathToPartitionInfo, Path dir,
Map<Map<Path, PartitionDesc>, Map<Path, PartitionDesc>> cacheMap)
Map<Map<Path, PartitionDesc>, Map<Path, PartitionDesc>> cacheMap)
throws IOException {
Method method;
try {
Expand Down Expand Up @@ -595,7 +646,7 @@ public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) throw
}

public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim,
Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException {
Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException {
this.inputSplitShim = inputSplitShim;
this.pathToPartitionInfo = pathToPartitionInfo;
if (job != null) {
Expand Down Expand Up @@ -767,7 +818,7 @@ private static class CombinePathInputFormat {
private final String deserializerClassName;

public CombinePathInputFormat(List<Operator<? extends OperatorDesc>> opList, String inputFormatClassName,
String deserializerClassName) {
String deserializerClassName) {
this.opList = opList;
this.inputFormatClassName = inputFormatClassName;
this.deserializerClassName = deserializerClassName;
Expand Down Expand Up @@ -930,13 +981,13 @@ public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOExcepti
int counter = 0;
for (int pos = 0; pos < splits.length; pos++) {
if (counter == maxSize - 1 || pos == splits.length - 1) {
builder.addSplit((FileSplit)splits[pos]);
builder.addSplit((FileSplit) splits[pos]);
combineFileSplits.add(builder.build(job));
builder = new HoodieCombineRealtimeFileSplit.Builder();
counter = 0;
} else if (counter < maxSize) {
counter++;
builder.addSplit((FileSplit)splits[pos]);
builder.addSplit((FileSplit) splits[pos]);
}
}
return combineFileSplits.toArray(new CombineFileSplit[combineFileSplits.size()]);
Expand All @@ -963,7 +1014,7 @@ public HadoopShimsSecure.InputSplitShim getInputSplitShim() {

@Override
public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter,
Class<RecordReader<K, V>> rrClass) throws IOException {
Class<RecordReader<K, V>> rrClass) throws IOException {
isRealTime = Boolean.valueOf(job.get("hudi.hive.realtime", "false"));
if (isRealTime) {
List<RecordReader> recordReaders = new LinkedList<>();
Expand Down
Loading

0 comments on commit 77eb9e5

Please sign in to comment.