diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index df489d4a9d7a..ebd1c33cf197 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -55,7 +55,7 @@ static Int64 findMinPosition(const NameSet & condition_table_columns, const Name return min_position; } -MergeTreeTable MergeTreeRelParser::parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table) +MergeTreeTableInstance MergeTreeRelParser::parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table) { logDebugMessage(extension_table, "merge_tree_table"); google::protobuf::StringValue table; @@ -63,14 +63,12 @@ MergeTreeTable MergeTreeRelParser::parseMergeTreeTable(const substrait::ReadRel: return parseMergeTreeTableString(table.value()); } -CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context, bool restore) +CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context) { const DB::Block header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, merge_tree_table.low_card_key); const auto metadata = buildMetaData(header, context, merge_tree_table); - // use instance global table (without uuid) to restore metadata folder on current instance - // we need its lock - auto global_storage = StorageMergeTreeFactory::getStorage( + return StorageMergeTreeFactory::getStorage( StorageID(merge_tree_table.database, merge_tree_table.table), merge_tree_table.snapshot_id, merge_tree_table, @@ -87,15 +85,19 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(const MergeTreeTable buildMergeTreeSettings(merge_tree_table.table_configs)); return custom_storage_merge_tree; }); +} - if (restore) - restoreMetaData(global_storage, merge_tree_table, *context); - - return global_storage; +CustomStorageMergeTreePtr +MergeTreeRelParser::parseStorageAndRestore(const MergeTreeTableInstance & merge_tree_table, const ContextMutablePtr & context) +{ + auto result = parseStorage(merge_tree_table, context); + restoreMetaData(result, merge_tree_table, *context); + return result; } -CustomStorageMergeTreePtr MergeTreeRelParser::copyToDefaultPolicyStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context) +CustomStorageMergeTreePtr MergeTreeRelParser::copyToDefaultPolicyStorage(const MergeTreeTable & table, ContextMutablePtr context) { + MergeTreeTable merge_tree_table{table}; auto temp_uuid = UUIDHelpers::generateV4(); String temp_uuid_str = toString(temp_uuid); merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str; @@ -105,8 +107,9 @@ CustomStorageMergeTreePtr MergeTreeRelParser::copyToDefaultPolicyStorage(MergeTr return parseStorage(merge_tree_table, context); } -CustomStorageMergeTreePtr MergeTreeRelParser::copyToVirtualStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context) +CustomStorageMergeTreePtr MergeTreeRelParser::copyToVirtualStorage(const MergeTreeTable & table, const ContextMutablePtr & context) { + MergeTreeTable merge_tree_table{table}; auto temp_uuid = UUIDHelpers::generateV4(); String temp_uuid_str = toString(temp_uuid); merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str; @@ -120,7 +123,7 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel( auto merge_tree_table = parseMergeTreeTable(extension_table); // ignore snapshot id for query merge_tree_table.snapshot_id = ""; - auto storage = parseStorage(merge_tree_table, global_context, true); + auto storage = parseStorageAndRestore(merge_tree_table, global_context); DB::Block input; if (rel.has_base_schema() && rel.base_schema().names_size()) @@ -368,22 +371,27 @@ void MergeTreeRelParser::collectColumns(const substrait::Expression & rel, NameS } } - -String MergeTreeRelParser::getCHFunctionName(const substrait::Expression_ScalarFunction & substrait_func) +String MergeTreeRelParser::getCHFunctionName(const substrait::Expression_ScalarFunction & substrait_func) const { auto func_signature = getPlanParser()->function_mapping.at(std::to_string(substrait_func.function_reference())); return getPlanParser()->getFunctionName(func_signature, substrait_func); } +MergeTreeTableInstance xparseMergeTreeTable(const substrait::extensions::AdvancedExtension & extension) +{ + logDebugMessage(extension, "merge_tree_table"); + + google::protobuf::StringValue table; + table.ParseFromString(extension.enhancement().value()); + return parseMergeTreeTableString(table.value()); +} String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_rel) { - google::protobuf::StringValue table; - table.ParseFromString(read_rel.advanced_extension().enhancement().value()); - auto merge_tree_table = parseMergeTreeTableString(table.value()); + auto merge_tree_table = xparseMergeTreeTable(read_rel.advanced_extension()); // ignore snapshot id for query merge_tree_table.snapshot_id = ""; - auto custom_storage_mergetree = parseStorage(merge_tree_table, global_context, true); + auto custom_storage_mergetree = parseStorageAndRestore(merge_tree_table, global_context); auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema()); auto names_and_types_list = input.getNamesAndTypesList(); diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h index b6d6c989835c..44b6a81cd9d2 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h @@ -39,19 +39,18 @@ using namespace DB; class MergeTreeRelParser : public RelParser { public: - static CustomStorageMergeTreePtr parseStorage( - const MergeTreeTable & merge_tree_table, ContextMutablePtr context, bool restore = false); + static CustomStorageMergeTreePtr parseStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context); + static CustomStorageMergeTreePtr + parseStorageAndRestore(const MergeTreeTableInstance & merge_tree_table, const ContextMutablePtr & context); // Create random table name and table path and use default storage policy. // In insert case, mergetree data can be uploaded after merges in default storage(Local Disk). - static CustomStorageMergeTreePtr - copyToDefaultPolicyStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context); + static CustomStorageMergeTreePtr copyToDefaultPolicyStorage(const MergeTreeTable & table, ContextMutablePtr context); - // Use same table path and data path as the originial table. - static CustomStorageMergeTreePtr - copyToVirtualStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context); + // Use same table path and data path as the original table. + static CustomStorageMergeTreePtr copyToVirtualStorage(const MergeTreeTable & table, const ContextMutablePtr & context); - static MergeTreeTable parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table); + static MergeTreeTableInstance parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table); explicit MergeTreeRelParser(SerializedPlanParser * plan_paser_, const ContextPtr & context_) : RelParser(plan_paser_), context(context_), global_context(plan_paser_->global_context) @@ -100,8 +99,8 @@ class MergeTreeRelParser : public RelParser void parseToAction(ActionsDAG & filter_action, const substrait::Expression & rel, std::string & filter_name); PrewhereInfoPtr parsePreWhereInfo(const substrait::Expression & rel, Block & input); ActionsDAG optimizePrewhereAction(const substrait::Expression & rel, std::string & filter_name, Block & block); - String getCHFunctionName(const substrait::Expression_ScalarFunction & substrait_func); - void collectColumns(const substrait::Expression & rel, NameSet & columns, Block & block); + String getCHFunctionName(const substrait::Expression_ScalarFunction & substrait_func) const; + static void collectColumns(const substrait::Expression & rel, NameSet & columns, Block & block); UInt64 getColumnsSize(const NameSet & columns); const ContextPtr & context; diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp index bfe15d5fa0aa..13c31d25bcdb 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp @@ -75,10 +75,10 @@ void CacheManager::initialize(DB::ContextMutablePtr context_) struct CacheJobContext { - MergeTreeTable table; + MergeTreeTableInstance table; }; -Task CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& part, const std::unordered_set & columns) +Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set & columns) { CacheJobContext job_context{table}; job_context.table.parts.clear(); @@ -88,7 +88,8 @@ Task CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& p { try { - auto storage = MergeTreeRelParser::parseStorage(job_detail.table, context, true); + auto storage = MergeTreeRelParser::parseStorageAndRestore(job_detail.table, context); + auto storage_snapshot = std::make_shared(*storage, storage->getInMemoryMetadataPtr()); NamesAndTypesList names_and_types_list; auto meta_columns = storage->getInMemoryMetadata().getColumns(); @@ -132,9 +133,8 @@ Task CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& p return std::move(task); } -JobId CacheManager::cacheParts(const String& table_def, const std::unordered_set& columns) +JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set& columns) { - auto table = parseMergeTreeTableString(table_def); JobId id = toString(UUIDHelpers::generateV4()); Job job(id); for (const auto & part : table.parts) @@ -148,7 +148,7 @@ JobId CacheManager::cacheParts(const String& table_def, const std::unordered_set jobject CacheManager::getCacheStatus(JNIEnv * env, const String & jobId) { - auto& scheduler = JobScheduler::instance(); + auto & scheduler = JobScheduler::instance(); auto job_status = scheduler.getJobSatus(jobId); int status = 0; String message; diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h b/cpp-ch/local-engine/Storages/Cache/CacheManager.h index 6335f86bb162..2c1c010432dd 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h @@ -18,37 +18,37 @@ #include +#include #include #include -#include #include namespace local_engine { struct MergeTreePart; -struct MergeTreeTable; - - +struct MergeTreeTableInstance; /*** * Manage the cache of the MergeTree, mainly including meta.bin, data.bin, metadata.gluten */ -class CacheManager { +class CacheManager +{ public: static jclass cache_result_class; static jmethodID cache_result_constructor; - static void initJNI(JNIEnv* env); + static void initJNI(JNIEnv * env); static CacheManager & instance(); static void initialize(DB::ContextMutablePtr context); - Task cachePart(const MergeTreeTable& table, const MergeTreePart& part, const std::unordered_set& columns); - JobId cacheParts(const String& table_def, const std::unordered_set& columns); - static jobject getCacheStatus(JNIEnv * env, const String& jobId); + JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set & columns); + static jobject getCacheStatus(JNIEnv * env, const String & jobId); Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder); JobId cacheFiles(substrait::ReadRel::LocalFiles file_infos); static void removeFiles(String file, String cache_name); + private: + Task cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set & columns); CacheManager() = default; DB::ContextMutablePtr context; }; diff --git a/cpp-ch/local-engine/Storages/MergeTree/MergeTreeTool.cpp b/cpp-ch/local-engine/Storages/MergeTree/MergeTreeTool.cpp index 8dbf13c90d5a..1dd7b5d830a8 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/MergeTreeTool.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/MergeTreeTool.cpp @@ -150,12 +150,9 @@ std::unique_ptr buildQueryInfo(NamesAndTypesList & names_and_ty return query_info; } - -MergeTreeTable parseMergeTreeTableString(const std::string & info) +void parseMergeTreeTableString(MergeTreeTable & table, ReadBufferFromString & in) { - ReadBufferFromString in(info); assertString("MergeTree;", in); - MergeTreeTable table; readString(table.database, in); assertChar('\n', in); readString(table.table, in); @@ -189,6 +186,14 @@ MergeTreeTable parseMergeTreeTableString(const std::string & info) readString(json, in); parseTableConfig(table.table_configs, json); assertChar('\n', in); +} + +MergeTreeTableInstance parseMergeTreeTableString(const std::string & info) +{ + MergeTreeTableInstance result; + ReadBufferFromString in(info); + parseMergeTreeTableString(result, in); + while (!in.eof()) { MergeTreePart part; @@ -198,12 +203,13 @@ MergeTreeTable parseMergeTreeTableString(const std::string & info) assertChar('\n', in); readIntText(part.end, in); assertChar('\n', in); - table.parts.emplace_back(part); + result.parts.emplace_back(part); } - return table; + + return result; } -std::unordered_set MergeTreeTable::getPartNames() const +std::unordered_set MergeTreeTableInstance::getPartNames() const { std::unordered_set names; for (const auto & part : parts) @@ -211,7 +217,7 @@ std::unordered_set MergeTreeTable::getPartNames() const return names; } -RangesInDataParts MergeTreeTable::extractRange(DataPartsVector parts_vector) const +RangesInDataParts MergeTreeTableInstance::extractRange(DataPartsVector parts_vector) const { std::unordered_map name_index; std::ranges::for_each(parts_vector, [&](const DataPartPtr & part) { name_index.emplace(part->name, part); }); diff --git a/cpp-ch/local-engine/Storages/MergeTree/MergeTreeTool.h b/cpp-ch/local-engine/Storages/MergeTree/MergeTreeTool.h index dfeb477f157a..0648b2c81b61 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/MergeTreeTool.h +++ b/cpp-ch/local-engine/Storages/MergeTree/MergeTreeTool.h @@ -26,6 +26,10 @@ #include #include +namespace DB +{ +class ReadBufferFromString; +} namespace local_engine { using namespace DB; @@ -44,7 +48,7 @@ struct MergeTreeTableSettings struct MergeTreeTable { - inline static const std::string TUPLE = "tuple()"; + static constexpr std::string_view TUPLE = "tuple()"; std::string database; std::string table; std::string snapshot_id; @@ -58,10 +62,15 @@ struct MergeTreeTable std::string relative_path; std::string absolute_path; MergeTreeTableSettings table_configs; + + bool sameStructWith(const MergeTreeTable & other) const; +}; + +struct MergeTreeTableInstance : MergeTreeTable +{ std::vector parts; std::unordered_set getPartNames() const; RangesInDataParts extractRange(DataPartsVector parts_vector) const; - bool sameStructWith(const MergeTreeTable & other) const; }; std::shared_ptr @@ -71,6 +80,5 @@ std::unique_ptr buildMergeTreeSettings(const MergeTreeTableSe std::unique_ptr buildQueryInfo(NamesAndTypesList & names_and_types_list); -MergeTreeTable parseMergeTreeTableString(const std::string & info); - +MergeTreeTableInstance parseMergeTreeTableString(const std::string & info); } diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp index 1a8b71eb1ecc..e1ed86b528ac 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp @@ -54,7 +54,7 @@ std::unordered_map extractPartMetaData(ReadBuffer & in) return result; } -void restoreMetaData(const CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context) +void restoreMetaData(const CustomStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context) { const auto data_disk = storage->getStoragePolicy()->getAnyDisk(); if (!data_disk->isRemote()) diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h index b093f687ab10..93ad461f99cd 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h +++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h @@ -23,7 +23,7 @@ namespace local_engine { -void restoreMetaData(const CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context); +void restoreMetaData(const CustomStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context); void saveFileStatus( const DB::MergeTreeData & storage, const DB::ContextPtr & context, const String & part_name, IDataPartStorage & data_part_storage); diff --git a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp index f33a69bf21ae..f882d7fab1d3 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp @@ -66,7 +66,7 @@ CustomStorageMergeTreePtr StorageMergeTreeFactory::getStorage( const auto table_name = getTableName(id, snapshot_id); std::lock_guard lock(storage_map_mutex); - merge_tree_table.parts.clear(); + // merge_tree_table.parts.clear(); if (storage_map->has(table_name) && !storage_map->get(table_name)->second.sameStructWith(merge_tree_table)) { freeStorage(id); diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 3e5369026ac3..72a3298e8840 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1296,7 +1296,8 @@ Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * std::unordered_set column_set; for (const auto & col : tokenizer) column_set.insert(col); - auto id = local_engine::CacheManager::instance().cacheParts(table_def, column_set); + auto table = local_engine::parseMergeTreeTableString(table_def); + auto id = local_engine::CacheManager::instance().cacheParts(table, column_set); return local_engine::charTojstring(env, id.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr); } diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp index 9efbc8d9a23b..d8c4ea70458f 100644 --- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp @@ -425,7 +425,7 @@ TEST(WritePipeline, SparkMergeTree) merging_params, std::move(storage_settings));*/ - GlutenMergeTreeWriteSettings gm_write_settings{ + MergeTreePartitionWriteSettings gm_write_settings{ .part_name_prefix{"this_is_prefix"}, }; gm_write_settings.set(context);