Skip to content

Commit

Permalink
MergeTreeTableInstance
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 3, 2024
1 parent 838746d commit 1db8664
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 60 deletions.
44 changes: 26 additions & 18 deletions cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,20 @@ static Int64 findMinPosition(const NameSet & condition_table_columns, const Name
return min_position;
}

MergeTreeTable MergeTreeRelParser::parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table)
MergeTreeTableInstance MergeTreeRelParser::parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table)
{
logDebugMessage(extension_table, "merge_tree_table");
google::protobuf::StringValue table;
table.ParseFromString(extension_table.detail().value());
return parseMergeTreeTableString(table.value());
}

CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context, bool restore)
CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context)
{
const DB::Block header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, merge_tree_table.low_card_key);
const auto metadata = buildMetaData(header, context, merge_tree_table);

// use instance global table (without uuid) to restore metadata folder on current instance
// we need its lock
auto global_storage = StorageMergeTreeFactory::getStorage(
return StorageMergeTreeFactory::getStorage(
StorageID(merge_tree_table.database, merge_tree_table.table),
merge_tree_table.snapshot_id,
merge_tree_table,
Expand All @@ -87,15 +85,19 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(const MergeTreeTable
buildMergeTreeSettings(merge_tree_table.table_configs));
return custom_storage_merge_tree;
});
}

if (restore)
restoreMetaData(global_storage, merge_tree_table, *context);

return global_storage;
CustomStorageMergeTreePtr
MergeTreeRelParser::parseStorageAndRestore(const MergeTreeTableInstance & merge_tree_table, const ContextMutablePtr & context)
{
auto result = parseStorage(merge_tree_table, context);
restoreMetaData(result, merge_tree_table, *context);
return result;
}

CustomStorageMergeTreePtr MergeTreeRelParser::copyToDefaultPolicyStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context)
CustomStorageMergeTreePtr MergeTreeRelParser::copyToDefaultPolicyStorage(const MergeTreeTable & table, ContextMutablePtr context)
{
MergeTreeTable merge_tree_table{table};
auto temp_uuid = UUIDHelpers::generateV4();
String temp_uuid_str = toString(temp_uuid);
merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str;
Expand All @@ -105,8 +107,9 @@ CustomStorageMergeTreePtr MergeTreeRelParser::copyToDefaultPolicyStorage(MergeTr
return parseStorage(merge_tree_table, context);
}

CustomStorageMergeTreePtr MergeTreeRelParser::copyToVirtualStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context)
CustomStorageMergeTreePtr MergeTreeRelParser::copyToVirtualStorage(const MergeTreeTable & table, const ContextMutablePtr & context)
{
MergeTreeTable merge_tree_table{table};
auto temp_uuid = UUIDHelpers::generateV4();
String temp_uuid_str = toString(temp_uuid);
merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str;
Expand All @@ -120,7 +123,7 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
auto merge_tree_table = parseMergeTreeTable(extension_table);
// ignore snapshot id for query
merge_tree_table.snapshot_id = "";
auto storage = parseStorage(merge_tree_table, global_context, true);
auto storage = parseStorageAndRestore(merge_tree_table, global_context);

DB::Block input;
if (rel.has_base_schema() && rel.base_schema().names_size())
Expand Down Expand Up @@ -368,22 +371,27 @@ void MergeTreeRelParser::collectColumns(const substrait::Expression & rel, NameS
}
}


String MergeTreeRelParser::getCHFunctionName(const substrait::Expression_ScalarFunction & substrait_func)
String MergeTreeRelParser::getCHFunctionName(const substrait::Expression_ScalarFunction & substrait_func) const
{
auto func_signature = getPlanParser()->function_mapping.at(std::to_string(substrait_func.function_reference()));
return getPlanParser()->getFunctionName(func_signature, substrait_func);
}

MergeTreeTableInstance xparseMergeTreeTable(const substrait::extensions::AdvancedExtension & extension)
{
logDebugMessage(extension, "merge_tree_table");

google::protobuf::StringValue table;
table.ParseFromString(extension.enhancement().value());
return parseMergeTreeTableString(table.value());
}

String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_rel)
{
google::protobuf::StringValue table;
table.ParseFromString(read_rel.advanced_extension().enhancement().value());
auto merge_tree_table = parseMergeTreeTableString(table.value());
auto merge_tree_table = xparseMergeTreeTable(read_rel.advanced_extension());
// ignore snapshot id for query
merge_tree_table.snapshot_id = "";
auto custom_storage_mergetree = parseStorage(merge_tree_table, global_context, true);
auto custom_storage_mergetree = parseStorageAndRestore(merge_tree_table, global_context);

auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
auto names_and_types_list = input.getNamesAndTypesList();
Expand Down
19 changes: 9 additions & 10 deletions cpp-ch/local-engine/Parser/MergeTreeRelParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,18 @@ using namespace DB;
class MergeTreeRelParser : public RelParser
{
public:
static CustomStorageMergeTreePtr parseStorage(
const MergeTreeTable & merge_tree_table, ContextMutablePtr context, bool restore = false);
static CustomStorageMergeTreePtr parseStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context);
static CustomStorageMergeTreePtr
parseStorageAndRestore(const MergeTreeTableInstance & merge_tree_table, const ContextMutablePtr & context);

// Create random table name and table path and use default storage policy.
// In insert case, mergetree data can be uploaded after merges in default storage(Local Disk).
static CustomStorageMergeTreePtr
copyToDefaultPolicyStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context);
static CustomStorageMergeTreePtr copyToDefaultPolicyStorage(const MergeTreeTable & table, ContextMutablePtr context);

// Use same table path and data path as the originial table.
static CustomStorageMergeTreePtr
copyToVirtualStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context);
// Use same table path and data path as the original table.
static CustomStorageMergeTreePtr copyToVirtualStorage(const MergeTreeTable & table, const ContextMutablePtr & context);

static MergeTreeTable parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table);
static MergeTreeTableInstance parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table);

explicit MergeTreeRelParser(SerializedPlanParser * plan_paser_, const ContextPtr & context_)
: RelParser(plan_paser_), context(context_), global_context(plan_paser_->global_context)
Expand Down Expand Up @@ -100,8 +99,8 @@ class MergeTreeRelParser : public RelParser
void parseToAction(ActionsDAG & filter_action, const substrait::Expression & rel, std::string & filter_name);
PrewhereInfoPtr parsePreWhereInfo(const substrait::Expression & rel, Block & input);
ActionsDAG optimizePrewhereAction(const substrait::Expression & rel, std::string & filter_name, Block & block);
String getCHFunctionName(const substrait::Expression_ScalarFunction & substrait_func);
void collectColumns(const substrait::Expression & rel, NameSet & columns, Block & block);
String getCHFunctionName(const substrait::Expression_ScalarFunction & substrait_func) const;
static void collectColumns(const substrait::Expression & rel, NameSet & columns, Block & block);
UInt64 getColumnsSize(const NameSet & columns);

const ContextPtr & context;
Expand Down
12 changes: 6 additions & 6 deletions cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ void CacheManager::initialize(DB::ContextMutablePtr context_)

struct CacheJobContext
{
MergeTreeTable table;
MergeTreeTableInstance table;
};

Task CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& part, const std::unordered_set<String> & columns)
Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns)
{
CacheJobContext job_context{table};
job_context.table.parts.clear();
Expand All @@ -88,7 +88,8 @@ Task CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& p
{
try
{
auto storage = MergeTreeRelParser::parseStorage(job_detail.table, context, true);
auto storage = MergeTreeRelParser::parseStorageAndRestore(job_detail.table, context);

auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
NamesAndTypesList names_and_types_list;
auto meta_columns = storage->getInMemoryMetadata().getColumns();
Expand Down Expand Up @@ -132,9 +133,8 @@ Task CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& p
return std::move(task);
}

JobId CacheManager::cacheParts(const String& table_def, const std::unordered_set<String>& columns)
JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String>& columns)
{
auto table = parseMergeTreeTableString(table_def);
JobId id = toString(UUIDHelpers::generateV4());
Job job(id);
for (const auto & part : table.parts)
Expand All @@ -148,7 +148,7 @@ JobId CacheManager::cacheParts(const String& table_def, const std::unordered_set

jobject CacheManager::getCacheStatus(JNIEnv * env, const String & jobId)
{
auto& scheduler = JobScheduler::instance();
auto & scheduler = JobScheduler::instance();
auto job_status = scheduler.getJobSatus(jobId);
int status = 0;
String message;
Expand Down
18 changes: 9 additions & 9 deletions cpp-ch/local-engine/Storages/Cache/CacheManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,37 @@

#include <substrait/plan.pb.h>

#include <jni.h>
#include <Disks/IDisk.h>
#include <Storages/Cache/JobScheduler.h>
#include <jni.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>

namespace local_engine
{
struct MergeTreePart;
struct MergeTreeTable;


struct MergeTreeTableInstance;

/***
* Manage the cache of the MergeTree, mainly including meta.bin, data.bin, metadata.gluten
*/
class CacheManager {
class CacheManager
{
public:
static jclass cache_result_class;
static jmethodID cache_result_constructor;
static void initJNI(JNIEnv* env);
static void initJNI(JNIEnv * env);

static CacheManager & instance();
static void initialize(DB::ContextMutablePtr context);
Task cachePart(const MergeTreeTable& table, const MergeTreePart& part, const std::unordered_set<String>& columns);
JobId cacheParts(const String& table_def, const std::unordered_set<String>& columns);
static jobject getCacheStatus(JNIEnv * env, const String& jobId);
JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String> & columns);
static jobject getCacheStatus(JNIEnv * env, const String & jobId);

Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder);
JobId cacheFiles(substrait::ReadRel::LocalFiles file_infos);
static void removeFiles(String file, String cache_name);

private:
Task cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns);
CacheManager() = default;
DB::ContextMutablePtr context;
};
Expand Down
22 changes: 14 additions & 8 deletions cpp-ch/local-engine/Storages/MergeTree/MergeTreeTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,9 @@ std::unique_ptr<SelectQueryInfo> buildQueryInfo(NamesAndTypesList & names_and_ty
return query_info;
}


MergeTreeTable parseMergeTreeTableString(const std::string & info)
void parseMergeTreeTableString(MergeTreeTable & table, ReadBufferFromString & in)
{
ReadBufferFromString in(info);
assertString("MergeTree;", in);
MergeTreeTable table;
readString(table.database, in);
assertChar('\n', in);
readString(table.table, in);
Expand Down Expand Up @@ -189,6 +186,14 @@ MergeTreeTable parseMergeTreeTableString(const std::string & info)
readString(json, in);
parseTableConfig(table.table_configs, json);
assertChar('\n', in);
}

MergeTreeTableInstance parseMergeTreeTableString(const std::string & info)
{
MergeTreeTableInstance result;
ReadBufferFromString in(info);
parseMergeTreeTableString(result, in);

while (!in.eof())
{
MergeTreePart part;
Expand All @@ -198,20 +203,21 @@ MergeTreeTable parseMergeTreeTableString(const std::string & info)
assertChar('\n', in);
readIntText(part.end, in);
assertChar('\n', in);
table.parts.emplace_back(part);
result.parts.emplace_back(part);
}
return table;

return result;
}

std::unordered_set<String> MergeTreeTable::getPartNames() const
std::unordered_set<String> MergeTreeTableInstance::getPartNames() const
{
std::unordered_set<String> names;
for (const auto & part : parts)
names.emplace(part.name);
return names;
}

RangesInDataParts MergeTreeTable::extractRange(DataPartsVector parts_vector) const
RangesInDataParts MergeTreeTableInstance::extractRange(DataPartsVector parts_vector) const
{
std::unordered_map<String, DataPartPtr> name_index;
std::ranges::for_each(parts_vector, [&](const DataPartPtr & part) { name_index.emplace(part->name, part); });
Expand Down
16 changes: 12 additions & 4 deletions cpp-ch/local-engine/Storages/MergeTree/MergeTreeTool.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
#include <Storages/StorageInMemoryMetadata.h>
#include <substrait/plan.pb.h>

namespace DB
{
class ReadBufferFromString;
}
namespace local_engine
{
using namespace DB;
Expand All @@ -44,7 +48,7 @@ struct MergeTreeTableSettings

struct MergeTreeTable
{
inline static const std::string TUPLE = "tuple()";
static constexpr std::string_view TUPLE = "tuple()";
std::string database;
std::string table;
std::string snapshot_id;
Expand All @@ -58,10 +62,15 @@ struct MergeTreeTable
std::string relative_path;
std::string absolute_path;
MergeTreeTableSettings table_configs;

bool sameStructWith(const MergeTreeTable & other) const;
};

struct MergeTreeTableInstance : MergeTreeTable
{
std::vector<MergeTreePart> parts;
std::unordered_set<std::string> getPartNames() const;
RangesInDataParts extractRange(DataPartsVector parts_vector) const;
bool sameStructWith(const MergeTreeTable & other) const;
};

std::shared_ptr<DB::StorageInMemoryMetadata>
Expand All @@ -71,6 +80,5 @@ std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings(const MergeTreeTableSe

std::unique_ptr<SelectQueryInfo> buildQueryInfo(NamesAndTypesList & names_and_types_list);

MergeTreeTable parseMergeTreeTableString(const std::string & info);

MergeTreeTableInstance parseMergeTreeTableString(const std::string & info);
}
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ std::unordered_map<String, String> extractPartMetaData(ReadBuffer & in)
return result;
}

void restoreMetaData(const CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context)
void restoreMetaData(const CustomStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context)
{
const auto data_disk = storage->getStoragePolicy()->getAnyDisk();
if (!data_disk->isRemote())
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
namespace local_engine
{

void restoreMetaData(const CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context);
void restoreMetaData(const CustomStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context);

void saveFileStatus(
const DB::MergeTreeData & storage, const DB::ContextPtr & context, const String & part_name, IDataPartStorage & data_part_storage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ CustomStorageMergeTreePtr StorageMergeTreeFactory::getStorage(
const auto table_name = getTableName(id, snapshot_id);
std::lock_guard lock(storage_map_mutex);

merge_tree_table.parts.clear();
// merge_tree_table.parts.clear();
if (storage_map->has(table_name) && !storage_map->get(table_name)->second.sameStructWith(merge_tree_table))
{
freeStorage(id);
Expand Down
3 changes: 2 additions & 1 deletion cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,8 @@ Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv *
std::unordered_set<String> column_set;
for (const auto & col : tokenizer)
column_set.insert(col);
auto id = local_engine::CacheManager::instance().cacheParts(table_def, column_set);
auto table = local_engine::parseMergeTreeTableString(table_def);
auto id = local_engine::CacheManager::instance().cacheParts(table, column_set);
return local_engine::charTojstring(env, id.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ TEST(WritePipeline, SparkMergeTree)
merging_params,
std::move(storage_settings));*/

GlutenMergeTreeWriteSettings gm_write_settings{
MergeTreePartitionWriteSettings gm_write_settings{
.part_name_prefix{"this_is_prefix"},
};
gm_write_settings.set(context);
Expand Down

0 comments on commit 1db8664

Please sign in to comment.