Skip to content

Commit

Permalink
Refactor Make static member to normal member
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 5, 2024
1 parent b758f9b commit 67f377b
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 104 deletions.
13 changes: 6 additions & 7 deletions cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ static Int64 findMinPosition(const NameSet & condition_table_columns, const Name
DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const substrait::ReadRel::ExtensionTable & extension_table)
{
auto merge_tree_table = MergeTreeTableInstance::parseMergeTreeTable(extension_table);
MergeTreeTableInstance merge_tree_table(extension_table);
// ignore snapshot id for query
merge_tree_table.snapshot_id = "";
auto storage = MergeTreeTableInstance::restoreStorage(merge_tree_table, global_context);
auto storage = merge_tree_table.restoreStorage(global_context);

DB::Block input;
if (rel.has_base_schema() && rel.base_schema().names_size())
Expand Down Expand Up @@ -316,10 +316,10 @@ String MergeTreeRelParser::getCHFunctionName(const substrait::Expression_ScalarF

String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_rel)
{
auto merge_tree_table = MergeTreeTableInstance::parseFromAny(read_rel.advanced_extension().enhancement());
MergeTreeTableInstance merge_tree_table(read_rel.advanced_extension().enhancement());
// ignore snapshot id for query
merge_tree_table.snapshot_id = "";
auto custom_storage_mergetree = MergeTreeTableInstance::restoreStorage(merge_tree_table, global_context);
auto storage = merge_tree_table.restoreStorage(global_context);

auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
auto names_and_types_list = input.getNamesAndTypesList();
Expand All @@ -330,11 +330,10 @@ String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_
std::vector<DataPartPtr> selected_parts = StorageMergeTreeFactory::getDataPartsByNames(
StorageID(merge_tree_table.database, merge_tree_table.table), merge_tree_table.snapshot_id, merge_tree_table.getPartNames());

auto storage_snapshot
= std::make_shared<StorageSnapshot>(*custom_storage_mergetree, custom_storage_mergetree->getInMemoryMetadataPtr());
auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
if (selected_parts.empty())
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found.");
auto read_step = custom_storage_mergetree->reader.readFromParts(
auto read_step = storage->reader.readFromParts(
selected_parts,
/* alter_conversions = */
{},
Expand Down
8 changes: 7 additions & 1 deletion cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


#include "SubstraitParserUtils.h"
#include <Common/Exception.h>
#include <google/protobuf/wrappers.pb.h>
#include <Common/logger_useful.h>

using namespace DB;
Expand All @@ -38,4 +38,10 @@ void logDebugMessage(const google::protobuf::Message & message, const char * typ
LOG_DEBUG(logger, "{}:\n{}", type, json);
}
}
std::string toString(const google::protobuf::Any & any)
{
google::protobuf::StringValue sv;
sv.ParseFromString(any.value());
return sv.value();
}
}
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Parser/SubstraitParserUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,5 @@ Message BinaryToMessage(const std::string_view binary)

void logDebugMessage(const google::protobuf::Message & message, const char * type);

std::string toString(const google::protobuf::Any & any);
} // namespace local_engine
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTr
{
try
{
auto storage = MergeTreeTableInstance::restoreStorage(job_detail.table, context);
auto storage = job_detail.table.restoreStorage(context);

auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
NamesAndTypesList names_and_types_list;
Expand Down
121 changes: 55 additions & 66 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
*/
#include "SparkMergeTreeMeta.h"

#include <google/protobuf/util/json_util.h>
#include <google/protobuf/wrappers.pb.h>
#include <rapidjson/document.h>

#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Parser/SubstraitParserUtils.h>
Expand All @@ -29,6 +25,8 @@
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MetaDataHelper.h>
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <google/protobuf/util/json_util.h>
#include <rapidjson/document.h>
#include <Poco/StringTokenizer.h>

using namespace DB;
Expand Down Expand Up @@ -126,49 +124,6 @@ doBuildMetadata(const DB::NamesAndTypesList & columns, const ContextPtr & contex
return metadata;
}

}
namespace local_engine
{


SparkStorageMergeTreePtr MergeTreeTable::getStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context)
{
const DB::Block header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, merge_tree_table.low_card_key);
const auto metadata = buildMetaData(header, context, merge_tree_table);

return StorageMergeTreeFactory::getStorage(
StorageID(merge_tree_table.database, merge_tree_table.table),
merge_tree_table.snapshot_id,
merge_tree_table,
[&]() -> SparkStorageMergeTreePtr
{
auto custom_storage_merge_tree = std::make_shared<SparkWriteStorageMergeTree>(merge_tree_table, *metadata, context);
return custom_storage_merge_tree;
});
}

SparkStorageMergeTreePtr MergeTreeTable::copyToDefaultPolicyStorage(const MergeTreeTable & table, ContextMutablePtr context)
{
MergeTreeTable merge_tree_table{table};
auto temp_uuid = UUIDHelpers::generateV4();
String temp_uuid_str = toString(temp_uuid);
merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str;
merge_tree_table.snapshot_id = "";
merge_tree_table.table_configs.storage_policy = "";
merge_tree_table.relative_path = merge_tree_table.relative_path + "_" + temp_uuid_str;
return getStorage(merge_tree_table, context);
}

SparkStorageMergeTreePtr MergeTreeTable::copyToVirtualStorage(const MergeTreeTable & table, const ContextMutablePtr & context)
{
MergeTreeTable merge_tree_table{table};
auto temp_uuid = UUIDHelpers::generateV4();
String temp_uuid_str = toString(temp_uuid);
merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str;
merge_tree_table.snapshot_id = "";
return getStorage(merge_tree_table, context);
}

void doParseMergeTreeTableString(MergeTreeTable & table, ReadBufferFromString & in)
{
assertString("MergeTree;", in);
Expand Down Expand Up @@ -207,11 +162,52 @@ void doParseMergeTreeTableString(MergeTreeTable & table, ReadBufferFromString &
assertChar('\n', in);
}

MergeTreeTableInstance MergeTreeTableInstance::parseMergeTreeTableString(const std::string & info)
}
namespace local_engine
{

SparkStorageMergeTreePtr MergeTreeTable::getStorage(ContextMutablePtr context) const
{
const DB::Block header = TypeParser::buildBlockFromNamedStruct(schema, low_card_key);
const auto metadata = buildMetaData(header, context);

return StorageMergeTreeFactory::getStorage(
StorageID(database, table),
snapshot_id,
*this,
[&]() -> SparkStorageMergeTreePtr
{
auto custom_storage_merge_tree = std::make_shared<SparkWriteStorageMergeTree>(*this, *metadata, context);
return custom_storage_merge_tree;
});
}

SparkStorageMergeTreePtr MergeTreeTable::copyToDefaultPolicyStorage(const ContextMutablePtr & context) const
{
MergeTreeTable merge_tree_table{*this};
auto temp_uuid = UUIDHelpers::generateV4();
String temp_uuid_str = toString(temp_uuid);
merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str;
merge_tree_table.snapshot_id = "";
merge_tree_table.table_configs.storage_policy = "";
merge_tree_table.relative_path = merge_tree_table.relative_path + "_" + temp_uuid_str;
return merge_tree_table.getStorage(context);
}

SparkStorageMergeTreePtr MergeTreeTable::copyToVirtualStorage(const ContextMutablePtr & context) const
{
MergeTreeTable merge_tree_table{*this};
auto temp_uuid = UUIDHelpers::generateV4();
String temp_uuid_str = toString(temp_uuid);
merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str;
merge_tree_table.snapshot_id = "";
return merge_tree_table.getStorage(context);
}

MergeTreeTableInstance::MergeTreeTableInstance(const std::string & info)
{
MergeTreeTableInstance result;
ReadBufferFromString in(info);
doParseMergeTreeTableString(result, in);
doParseMergeTreeTableString(*this, in);

while (!in.eof())
{
Expand All @@ -222,37 +218,30 @@ MergeTreeTableInstance MergeTreeTableInstance::parseMergeTreeTableString(const s
assertChar('\n', in);
readIntText(part.end, in);
assertChar('\n', in);
result.parts.emplace_back(part);
parts.emplace_back(part);
}

return result;
}

MergeTreeTableInstance MergeTreeTableInstance::parseFromAny(const google::protobuf::Any & any)
MergeTreeTableInstance::MergeTreeTableInstance(const google::protobuf::Any & any) : MergeTreeTableInstance(toString(any))
{
google::protobuf::StringValue table;
table.ParseFromString(any.value());
return parseMergeTreeTableString(table.value());
}

MergeTreeTableInstance MergeTreeTableInstance::parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table)
MergeTreeTableInstance::MergeTreeTableInstance(const substrait::ReadRel::ExtensionTable & extension_table)
: MergeTreeTableInstance(extension_table.detail())
{
logDebugMessage(extension_table, "merge_tree_table");
return parseFromAny(extension_table.detail());
}

SparkStorageMergeTreePtr
MergeTreeTableInstance::restoreStorage(const MergeTreeTableInstance & merge_tree_table, const ContextMutablePtr & context)
SparkStorageMergeTreePtr MergeTreeTableInstance::restoreStorage(const ContextMutablePtr & context) const
{
auto result = getStorage(merge_tree_table, context);
restoreMetaData(result, merge_tree_table, *context);
auto result = getStorage(context);
restoreMetaData(result, *this, *context);
return result;
}

std::shared_ptr<DB::StorageInMemoryMetadata>
buildMetaData(const DB::Block & header, const ContextPtr & context, const MergeTreeTable & table)
std::shared_ptr<DB::StorageInMemoryMetadata> MergeTreeTable::buildMetaData(const DB::Block & header, const ContextPtr & context) const
{
return doBuildMetadata(header.getNamesAndTypesList(), context, table);
return doBuildMetadata(header.getNamesAndTypesList(), context, *this);
}

std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings(const MergeTreeTableSettings & config)
Expand Down
26 changes: 13 additions & 13 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ struct MergeTreeTable

bool sameTable(const MergeTreeTable & other) const;

static SparkStorageMergeTreePtr getStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context);
SparkStorageMergeTreePtr getStorage(ContextMutablePtr context) const;

// Create random table name and table path and use default storage policy.
// In insert case, mergetree data can be uploaded after merges in default storage(Local Disk).
static SparkStorageMergeTreePtr copyToDefaultPolicyStorage(const MergeTreeTable & table, ContextMutablePtr context);
/// Create random table name and table path and use default storage policy.
/// In insert case, mergetree data can be uploaded after merges in default storage(Local Disk).
SparkStorageMergeTreePtr copyToDefaultPolicyStorage(const ContextMutablePtr & context) const;

// Use same table path and data path as the original table.
static SparkStorageMergeTreePtr copyToVirtualStorage(const MergeTreeTable & table, const ContextMutablePtr & context);
/// Use same table path and data path as the original table.
SparkStorageMergeTreePtr copyToVirtualStorage(const ContextMutablePtr & context) const;

std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const DB::Block & header, const ContextPtr & context) const;
};

struct MergeTreeTableInstance : MergeTreeTable
Expand All @@ -83,14 +85,12 @@ struct MergeTreeTableInstance : MergeTreeTable
std::unordered_set<std::string> getPartNames() const;
RangesInDataParts extractRange(DataPartsVector parts_vector) const;

static SparkStorageMergeTreePtr restoreStorage(const MergeTreeTableInstance & merge_tree_table, const ContextMutablePtr & context);
static MergeTreeTableInstance parseFromAny(const google::protobuf::Any & any);
static MergeTreeTableInstance parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table);
static MergeTreeTableInstance parseMergeTreeTableString(const std::string & info);
};
SparkStorageMergeTreePtr restoreStorage(const ContextMutablePtr & context) const;

std::shared_ptr<DB::StorageInMemoryMetadata>
buildMetaData(const DB::Block & header, const ContextPtr & context, const MergeTreeTable & table);
explicit MergeTreeTableInstance(const google::protobuf::Any & any);
explicit MergeTreeTableInstance(const substrait::ReadRel::ExtensionTable & extension_table);
explicit MergeTreeTableInstance(const std::string & info);
};

std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings(const MergeTreeTableSettings & config);

Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ void SparkMergeTreeSink::onFinish()
SinkHelperPtr SparkMergeTreeSink::create(
const MergeTreeTable & merge_tree_table, const SparkMergeTreeWriteSettings & write_settings_, const DB::ContextMutablePtr & context)
{
auto dest_storage = MergeTreeTable::getStorage(merge_tree_table, context);
auto dest_storage = merge_tree_table.getStorage(context);
bool isRemoteStorage = dest_storage->getStoragePolicy()->getAnyDisk()->isRemote();
bool insert_with_local_storage = !write_settings_.insert_without_local_storage;
if (insert_with_local_storage && isRemoteStorage)
{
auto temp = MergeTreeTable::copyToDefaultPolicyStorage(merge_tree_table, context);
auto temp = merge_tree_table.copyToDefaultPolicyStorage(context);
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Create temp table {} for local merge.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ std::unique_ptr<SparkMergeTreeWriter> SparkMergeTreeWriter::create(
const DB::ContextMutablePtr & context)
{
const DB::Settings & settings = context->getSettingsRef();
const auto dest_storage = MergeTreeTable::getStorage(merge_tree_table, context);
const auto dest_storage = merge_tree_table.getStorage(context);
StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr();
Block header = metadata_snapshot->getSampleBlock();
ASTPtr none;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ SinkToStoragePtr SparkWriteStorageMergeTree::write(
settings.load(context);
SinkHelperPtr sink_helper = SparkMergeTreeSink::create(table, settings, getContext());
#ifndef NDEBUG
auto dest_storage = MergeTreeTable::getStorage(table, getContext());
auto dest_storage = table.getStorage(getContext());
assert(dest_storage.get() == this);
#endif

Expand Down
13 changes: 5 additions & 8 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -901,9 +901,9 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info_);
auto extension_table = local_engine::BinaryToMessage<substrait::ReadRel::ExtensionTable>(
{reinterpret_cast<const char *>(split_info_a.elems()), static_cast<size_t>(split_info_a.length())});
auto merge_tree_table = local_engine::MergeTreeTableInstance::parseMergeTreeTable(extension_table);

local_engine::MergeTreeTableInstance merge_tree_table(extension_table);
auto * writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, settings, query_context).release();

return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
Expand Down Expand Up @@ -995,13 +995,10 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
auto extension_table = local_engine::BinaryToMessage<substrait::ReadRel::ExtensionTable>(
{reinterpret_cast<const char *>(split_info_a.elems()), static_cast<size_t>(split_info_a.length())});

google::protobuf::StringValue table;
table.ParseFromString(extension_table.detail().value());
auto merge_tree_table = local_engine::MergeTreeTableInstance::parseMergeTreeTableString(table.value());

local_engine::MergeTreeTableInstance merge_tree_table(extension_table);
auto context = local_engine::QueryContextManager::instance().currentQueryContext();
// each task using its own CustomStorageMergeTree, don't reuse
auto temp_storage = local_engine::MergeTreeTable::copyToVirtualStorage(merge_tree_table, context);
auto temp_storage = merge_tree_table.copyToVirtualStorage(context);
// prefetch all needed parts metadata before merge
local_engine::restoreMetaData(temp_storage, merge_tree_table, *context);

Expand Down Expand Up @@ -1296,7 +1293,7 @@ Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv *
std::unordered_set<String> column_set;
for (const auto & col : tokenizer)
column_set.insert(col);
auto table = local_engine::MergeTreeTableInstance::parseMergeTreeTableString(table_def);
local_engine::MergeTreeTableInstance table(table_def);
auto id = local_engine::CacheManager::instance().cacheParts(table, column_set);
return local_engine::charTojstring(env, id.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
Expand Down
8 changes: 4 additions & 4 deletions cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ TEST(WritePipeline, SparkMergeTree)
const Settings & settings = context->getSettingsRef();

const auto extension_table = local_engine::JsonStringToMessage<substrait::ReadRel::ExtensionTable>(EMBEDDED_PLAN(_1_mergetree_));
const auto merge_tree_table = MergeTreeTableInstance::parseMergeTreeTable(extension_table);
MergeTreeTableInstance merge_tree_table(extension_table);

EXPECT_EQ(merge_tree_table.database, "default");
EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree");
Expand All @@ -396,7 +396,7 @@ TEST(WritePipeline, SparkMergeTree)

do_remove(merge_tree_table.relative_path);

const auto dest_storage = MergeTreeTable::getStorage(merge_tree_table, SerializedPlanParser::global_context);
const auto dest_storage = merge_tree_table.getStorage(SerializedPlanParser::global_context);
EXPECT_TRUE(dest_storage);
EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote());
DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr();
Expand Down Expand Up @@ -431,13 +431,13 @@ TEST(WritePipeline, SparkMergeTree)
{
const auto extension_table_hdfs
= local_engine::JsonStringToMessage<substrait::ReadRel::ExtensionTable>(EMBEDDED_PLAN(_1_mergetree_hdfs_));
const auto merge_tree_table_hdfs = MergeTreeTableInstance::parseMergeTreeTable(extension_table_hdfs);
MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs);
EXPECT_EQ(merge_tree_table_hdfs.database, "default");
EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs");
EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs");
EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main");

const auto dest_storage_hdfs = MergeTreeTable::getStorage(merge_tree_table_hdfs, SerializedPlanParser::global_context);
const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(SerializedPlanParser::global_context);
EXPECT_TRUE(dest_storage_hdfs);
EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote());
}
Expand Down

0 comments on commit 67f377b

Please sign in to comment.