Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refresh table info after zk updated #3328

Merged
merged 9 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/cmd/sql_cmd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ TEST_F(SqlCmdTest, SelectMultiPartition) {
}

TEST_F(SqlCmdTest, ShowNameserverJob) {
auto sr = cluster_cli.sr;
sr = cluster_cli.sr;
std::string db_name = "test" + GenRand();
std::string name = "table" + GenRand();
std::string ddl = "create table " + name +
Expand Down Expand Up @@ -3826,6 +3826,7 @@ int main(int argc, char** argv) {
::openmldb::sdk::ClusterOptions copt;
copt.zk_cluster = mc.GetZkCluster();
copt.zk_path = mc.GetZkPath();
copt.zk_session_timeout = FLAGS_zk_session_timeout;
::openmldb::cmd::cluster_cli.cs = new ::openmldb::sdk::ClusterSDK(copt);
::openmldb::cmd::cluster_cli.cs->Init();
::openmldb::cmd::cluster_cli.sr = new ::openmldb::sdk::SQLClusterRouter(::openmldb::cmd::cluster_cli.cs);
Expand Down
38 changes: 29 additions & 9 deletions src/nameserver/name_server_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
#include "nameserver/name_server_impl.h"

#include <algorithm>
#include <set>
#include <random>
#include <iterator>
#include <iostream>
#include <iterator>
#include <random>
#include <set>
#include <vector>


#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/numbers.h"
#include "absl/time/time.h"
#include "nameserver/system_table.h"
#include "sdk/db_sdk.h"
Expand All @@ -44,10 +43,10 @@
#include "base/strings.h"
#include "boost/algorithm/string.hpp"
#include "boost/bind.hpp"
#include "codec/row_codec.h"
#include "gflags/gflags.h"
#include "schema/index_util.h"
#include "schema/schema_adapter.h"
#include "codec/row_codec.h"

DECLARE_string(endpoint);
DECLARE_string(zk_cluster);
Expand Down Expand Up @@ -75,7 +74,6 @@
DECLARE_bool(enable_distsql);
DECLARE_uint32(sync_deploy_stats_timeout);


namespace openmldb {
namespace nameserver {

Expand Down Expand Up @@ -8584,6 +8582,24 @@
}
}
UpdateZkTableNode(table_info);
// refresh tablet here, cuz this func may be called by task
dl239 marked this conversation as resolved.
Show resolved Hide resolved
// if refresh failed, won't break the process of add index
std::set<std::string> endpoint_set;
for (const auto& part : table_info->table_partition()) {
for (const auto& meta : part.partition_meta()) {
endpoint_set.insert(meta.endpoint());
}
}
// locked on top
for (const auto& tablet : tablets_) {
if (!tablet.second->Health()) {
continue;

Check warning on line 8596 in src/nameserver/name_server_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/nameserver/name_server_impl.cc#L8596

Added line #L8596 was not covered by tests
}
if (endpoint_set.count(tablet.first) == 0) {
tablet.second->client_->Refresh(table_info->tid());
}
}

PDLOG(INFO, "add index ok. table %s index cnt %d", name.c_str(), column_key.size());
if (task_info) {
task_info->set_status(::openmldb::api::TaskStatus::kDone);
Expand Down Expand Up @@ -9561,9 +9577,8 @@
}
}

std::shared_ptr<TabletInfo> NameServerImpl::GetTablet(const std::string& endpoint) {
std::shared_ptr<TabletInfo> NameServerImpl::GetTabletUnlock(const std::string& endpoint) {
std::shared_ptr<TabletInfo> tablet_ptr;
std::lock_guard<std::mutex> lock(mu_);
auto iter = tablets_.find(endpoint);
// check tablet if exist
if (iter == tablets_.end()) {
Expand All @@ -9577,6 +9592,11 @@
return tablet_ptr;
}

std::shared_ptr<TabletInfo> NameServerImpl::GetTablet(const std::string& endpoint) {
std::lock_guard<std::mutex> lock(mu_);
return GetTabletUnlock(endpoint);
}

void NameServerImpl::CreateDatabaseOrExit(const std::string& db) {
auto status = CreateDatabase(db, true);
if (!status.OK() && status.code != ::openmldb::base::ReturnCode::kDatabaseAlreadyExists) {
Expand Down
1 change: 1 addition & 0 deletions src/nameserver/name_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ class NameServerImpl : public NameServer {

void DropProcedureOnTablet(const std::string& db_name, const std::string& sp_name);

std::shared_ptr<TabletInfo> GetTabletUnlock(const std::string& endpoint);
std::shared_ptr<TabletInfo> GetTablet(const std::string& endpoint);

std::vector<std::shared_ptr<TabletInfo>> GetAllHealthTablet();
Expand Down
2 changes: 1 addition & 1 deletion src/test/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void ProcessSQLs(sdk::SQLRouter* sr, std::initializer_list<absl::string_view> sq

void ExpectResultSetStrEq(const std::vector<std::vector<CellExpectInfo>>& expect, hybridse::sdk::ResultSet* rs,
bool ordered) {
ASSERT_EQ(expect.size(), rs->Size() + 1);
ASSERT_EQ(expect.size(), static_cast<uint64_t>(rs->Size()) + 1);
size_t idx = 0;
// schema check
ASSERT_EQ(expect.front().size(), rs->GetSchema()->GetColumnCnt());
Expand Down
Loading