Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8182]Cache internalSchema for hive read, avoid each split reloa… #11914

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
Expand All @@ -36,6 +38,7 @@
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
Expand Down Expand Up @@ -63,12 +66,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
Expand Down Expand Up @@ -98,22 +104,75 @@ public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException
public SchemaEvolutionContext(InputSplit split, JobConf job, Option<HoodieTableMetaClient> metaClientOption) throws IOException {
this.split = split;
this.job = job;
this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient();
if (this.metaClient == null) {
if (!job.getBoolean(HIVE_EVOLUTION_ENABLE, true)) {
LOG.info("Schema evolution is disabled for split: {}", split);
internalSchemaOption = Option.empty();
this.metaClient = null;
return;
}
this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient();
this.internalSchemaOption = getInternalSchemaFromCache();
}

public Option<InternalSchema> getInternalSchemaFromCache() throws IOException {
Option<InternalSchema> internalSchemaOpt = getCachedData(
HoodieCombineHiveInputFormat.INTERNAL_SCHEMA_CACHE_KEY_PREFIX,
SerDeHelper::fromJson);
if (internalSchemaOpt == null) {
// the code path should only be invoked in tests.
return new TableSchemaResolver(this.metaClient).getTableInternalSchemaFromCommitMetadata();
}
return internalSchemaOpt;
}

public Schema getAvroSchemaFromCache() throws Exception {
Option<Schema> avroSchemaOpt = getCachedData(
HoodieCombineHiveInputFormat.SCHEMA_CACHE_KEY_PREFIX,
json -> Option.ofNullable(new Schema.Parser().parse(json)));
if (avroSchemaOpt == null) {
// the code path should only be invoked in tests.
return new TableSchemaResolver(this.metaClient).getTableAvroSchema();
}
return avroSchemaOpt.orElseThrow(() -> new HoodieValidationException("The avro schema cache should always be set up together with the internal schema cache"));
}

/**
* Returns the cache data with given key or null if the cache was never set up.
*/
@Nullable
private <T> Option<T> getCachedData(String keyPrefix, Function<String, Option<T>> parser) throws IOException {
Option<StoragePath> tablePath = getTablePath(job, split);
if (!tablePath.isPresent()) {
return Option.empty();
}
String cacheKey = keyPrefix + "." + tablePath.get().toUri();
String cachedJson = job.get(cacheKey);
if (cachedJson == null) {
// the code path should only be invoked in tests.
return null;
}
if (cachedJson.isEmpty()) {
return Option.empty();
}
try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
this.internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata();
return parser.apply(cachedJson);
} catch (Exception e) {
internalSchemaOption = Option.empty();
LOG.warn(String.format("failed to get internal Schema from hudi table:%s", metaClient.getBasePath()), e);
LOG.warn("Failed to parse data from cache with key: {}", cacheKey, e);
return Option.empty();
}
}

private Option<StoragePath> getTablePath(JobConf job, InputSplit split) throws IOException {
if (split instanceof FileSplit) {
Path path = ((FileSplit) split).getPath();
FileSystem fs = path.getFileSystem(job);
HoodieStorage storage = new HoodieHadoopStorage(fs);
return TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
}
LOG.info("finish init schema evolution for split: {}", split);
return Option.empty();
}

private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException {
private HoodieTableMetaClient setUpHoodieTableMetaClient() {
try {
Path inputPath = ((FileSplit) split).getPath();
FileSystem fs = inputPath.getFileSystem(job);
Expand All @@ -122,7 +181,7 @@ private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException {
return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString())
.setConf(HadoopFSUtils.getStorageConfWithCopy(job)).build();
} catch (Exception e) {
LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit)split).getPath()), e);
LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit) split).getPath()), e);
return null;
}
}
Expand All @@ -139,13 +198,13 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
return;
}
if (internalSchemaOption.isPresent()) {
Schema tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
Schema tableAvroSchema = getAvroSchemaFromCache();
List<String> requiredColumns = getRequireColumn(job);
InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(),
requiredColumns);
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
List<String> partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
List<String> partitioningFields = !partitionFields.isEmpty() ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
: new ArrayList<>();
Schema writerSchema = AvroInternalSchemaConverter.convert(internalSchemaOption.get(), tableAvroSchema.getName());
writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields);
Expand All @@ -171,7 +230,7 @@ public void doEvolutionForParquetFormat() {
if (internalSchemaOption.isPresent()) {
// reading hoodie schema evolution table
job.setBoolean(HIVE_EVOLUTION_ENABLE, true);
Path finalPath = ((FileSplit)split).getPath();
Path finalPath = ((FileSplit) split).getPath();
InternalSchema prunedSchema;
List<String> requiredColumns = getRequireColumn(job);
// No need trigger schema evolution for count(*)/count(1) operation
Expand Down Expand Up @@ -223,12 +282,12 @@ public void setColumnTypeList(JobConf job, List<Types.Field> fields) {
private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) {
switch (type.typeId()) {
case RECORD:
Types.RecordType record = (Types.RecordType)type;
Types.RecordType record = (Types.RecordType) type;
List<Types.Field> fields = record.fields();
ArrayList<TypeInfo> fieldTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
ArrayList<TypeInfo> fieldTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
for (int index = 0; index < fields.size(); index++) {
StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo;
StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
TypeInfo subTypeInfo = getSchemaSubTypeInfo(structTypeInfo.getAllStructFieldTypeInfos().get(index), fields.get(index).type());
fieldTypes.add(subTypeInfo);
String name = fields.get(index).name();
Expand All @@ -239,14 +298,14 @@ private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) {
structTypeInfo.setAllStructFieldTypeInfos(fieldTypes);
return structTypeInfo;
case ARRAY:
ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo;
ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
Types.ArrayType array = (Types.ArrayType) type;
TypeInfo subTypeInfo = getSchemaSubTypeInfo(listTypeInfo.getListElementTypeInfo(), array.elementType());
listTypeInfo.setListElementTypeInfo(subTypeInfo);
return listTypeInfo;
case MAP:
Types.MapType map = (Types.MapType)type;
MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo;
Types.MapType map = (Types.MapType) type;
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
TypeInfo keyType = getSchemaSubTypeInfo(mapTypeInfo.getMapKeyTypeInfo(), map.keyType());
TypeInfo valueType = getSchemaSubTypeInfo(mapTypeInfo.getMapValueTypeInfo(), map.valueType());
MapTypeInfo mapType = new MapTypeInfo();
Expand Down Expand Up @@ -296,9 +355,9 @@ private void pushDownFilter(JobConf job, InternalSchema querySchema, InternalSch
for (int i = 0; i < size; i++) {
ExprNodeDesc expr = exprNodes.poll();
if (expr instanceof ExprNodeColumnDesc) {
String oldColumn = ((ExprNodeColumnDesc)expr).getColumn();
String oldColumn = ((ExprNodeColumnDesc) expr).getColumn();
String newColumn = InternalSchemaUtils.reBuildFilterName(oldColumn, fileSchema, querySchema);
((ExprNodeColumnDesc)expr).setColumn(newColumn);
((ExprNodeColumnDesc) expr).setColumn(newColumn);
}
List<ExprNodeDesc> children = expr.getChildren();
if (children != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,24 @@

package org.apache.hudi.hadoop.hive;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormatBase;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -96,7 +107,7 @@
* CombineHiveInputFormat is a parameterized InputFormat which looks at the path name and determine the correct
* InputFormat for that path name from mapredPlan.pathToPartitionInfo(). It can be used to read files with different
* input format in the same map-reduce job.
*
* <p>
* NOTE : This class is implemented to work with Hive 2.x +
*/
public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extends Writable>
Expand All @@ -108,6 +119,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
// max number of threads we can use to check non-combinable paths
private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
private static final int DEFAULT_NUM_PATH_PER_THREAD = 100;
public static final String INTERNAL_SCHEMA_CACHE_KEY_PREFIX = "hudi.hive.internal.schema.cache.key.prefix";
public static final String SCHEMA_CACHE_KEY_PREFIX = "hudi.hive.schema.cache.key.prefix";

protected String getParquetInputFormatClassName() {
return HoodieParquetInputFormat.class.getName();
Expand Down Expand Up @@ -375,6 +388,43 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
// clear work from ThreadLocal after splits generated in case of thread is reused in pool.
Utilities.clearWorkMapForConf(job);

// build internal schema for the query
if (!result.isEmpty()) {
ArrayList<String> uniqTablePaths = new ArrayList<>();
Arrays.stream(paths).forEach(path -> {
final HoodieStorage storage;
try {
storage = new HoodieHadoopStorage(path.getFileSystem(job));
Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
if (tablePath.isPresent()) {
uniqTablePaths.add(tablePath.get().toUri().toString());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});

try {
for (String path : uniqTablePaths) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(new HadoopStorageConfiguration(job)).build();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String avroSchema = schemaUtil.getTableAvroSchema().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the cache would be utilized.

Option<InternalSchema> internalSchema = schemaUtil.getTableInternalSchemaFromCommitMetadata();
if (internalSchema.isPresent()) {
LOG.info("Set internal and avro schema cache with path: " + path);
job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema);
job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, SerDeHelper.toJson(internalSchema.get()));
} else {
// always sets up the cache so that we can distinguish with the scenario where the cache was never set(e.g. in tests).
job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
}
}
} catch (Exception e) {
LOG.warn("Fail to set schema cache", e);
}
}

LOG.info("Number of all splits " + result.size());
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return result.toArray(new InputSplit[result.size()]);
Expand All @@ -390,14 +440,15 @@ private void processPaths(JobConf job, CombineFileInputFormatShim combine, List<
/**
* HiveFileFormatUtils.getPartitionDescFromPathRecursively is no longer available since Hive 3.
* This method is to make it compatible with both Hive 2 and Hive 3.
*
* @param pathToPartitionInfo
* @param dir
* @param cacheMap
* @return
* @throws IOException
*/
private static PartitionDesc getPartitionFromPath(Map<Path, PartitionDesc> pathToPartitionInfo, Path dir,
Map<Map<Path, PartitionDesc>, Map<Path, PartitionDesc>> cacheMap)
Map<Map<Path, PartitionDesc>, Map<Path, PartitionDesc>> cacheMap)
throws IOException {
Method method;
try {
Expand Down Expand Up @@ -594,7 +645,7 @@ public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) throw
}

public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim,
Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException {
Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException {
this.inputSplitShim = inputSplitShim;
this.pathToPartitionInfo = pathToPartitionInfo;
if (job != null) {
Expand Down Expand Up @@ -766,7 +817,7 @@ private static class CombinePathInputFormat {
private final String deserializerClassName;

public CombinePathInputFormat(List<Operator<? extends OperatorDesc>> opList, String inputFormatClassName,
String deserializerClassName) {
String deserializerClassName) {
this.opList = opList;
this.inputFormatClassName = inputFormatClassName;
this.deserializerClassName = deserializerClassName;
Expand Down Expand Up @@ -929,13 +980,13 @@ public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOExcepti
int counter = 0;
for (int pos = 0; pos < splits.length; pos++) {
if (counter == maxSize - 1 || pos == splits.length - 1) {
builder.addSplit((FileSplit)splits[pos]);
builder.addSplit((FileSplit) splits[pos]);
combineFileSplits.add(builder.build(job));
builder = new HoodieCombineRealtimeFileSplit.Builder();
counter = 0;
} else if (counter < maxSize) {
counter++;
builder.addSplit((FileSplit)splits[pos]);
builder.addSplit((FileSplit) splits[pos]);
}
}
return combineFileSplits.toArray(new CombineFileSplit[combineFileSplits.size()]);
Expand All @@ -962,7 +1013,7 @@ public HadoopShimsSecure.InputSplitShim getInputSplitShim() {

@Override
public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter,
Class<RecordReader<K, V>> rrClass) throws IOException {
Class<RecordReader<K, V>> rrClass) throws IOException {
isRealTime = Boolean.valueOf(job.get("hudi.hive.realtime", "false"));
if (isRealTime) {
List<RecordReader> recordReaders = new LinkedList<>();
Expand Down
Loading
Loading