Skip to content

Commit

Permalink
fix: refresh table info after zk updated (#3328)
Browse files Browse the repository at this point in the history
  • Loading branch information
vagetablechicken committed Jun 26, 2023
1 parent 4b88184 commit eb14873
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/cmd/sql_cmd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ TEST_F(SqlCmdTest, SelectMultiPartition) {
}

TEST_F(SqlCmdTest, ShowNameserverJob) {
auto sr = cluster_cli.sr;
sr = cluster_cli.sr;
std::string db_name = "test" + GenRand();
std::string name = "table" + GenRand();
std::string ddl = "create table " + name +
Expand Down Expand Up @@ -3824,6 +3824,7 @@ int main(int argc, char** argv) {
::openmldb::sdk::ClusterOptions copt;
copt.zk_cluster = mc.GetZkCluster();
copt.zk_path = mc.GetZkPath();
copt.zk_session_timeout = FLAGS_zk_session_timeout;
::openmldb::cmd::cluster_cli.cs = new ::openmldb::sdk::ClusterSDK(copt);
::openmldb::cmd::cluster_cli.cs->Init();
::openmldb::cmd::cluster_cli.sr = new ::openmldb::sdk::SQLClusterRouter(::openmldb::cmd::cluster_cli.cs);
Expand Down
38 changes: 29 additions & 9 deletions src/nameserver/name_server_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
#include "nameserver/name_server_impl.h"

#include <algorithm>
#include <set>
#include <random>
#include <iterator>
#include <iostream>
#include <iterator>
#include <random>
#include <set>
#include <vector>


#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/numbers.h"
#include "absl/time/time.h"
#include "nameserver/system_table.h"
#include "sdk/db_sdk.h"
Expand All @@ -44,10 +43,10 @@
#include "base/strings.h"
#include "boost/algorithm/string.hpp"
#include "boost/bind.hpp"
#include "codec/row_codec.h"
#include "gflags/gflags.h"
#include "schema/index_util.h"
#include "schema/schema_adapter.h"
#include "codec/row_codec.h"

DECLARE_string(endpoint);
DECLARE_string(zk_cluster);
Expand Down Expand Up @@ -75,7 +74,6 @@ DECLARE_bool(use_name);
DECLARE_bool(enable_distsql);
DECLARE_uint32(sync_deploy_stats_timeout);


namespace openmldb {
namespace nameserver {

Expand Down Expand Up @@ -8584,6 +8582,24 @@ bool NameServerImpl::AddIndexToTableInfo(const std::string& name, const std::str
}
}
UpdateZkTableNode(table_info);
// refresh tablet here, cuz this func may be called by task
// if refresh failed, won't break the process of add index
std::set<std::string> endpoint_set;
for (const auto& part : table_info->table_partition()) {
for (const auto& meta : part.partition_meta()) {
endpoint_set.insert(meta.endpoint());
}
}
// locked on top
for (const auto& tablet : tablets_) {
if (!tablet.second->Health()) {
continue;
}
if (endpoint_set.count(tablet.first) == 0) {
tablet.second->client_->Refresh(table_info->tid());
}
}

PDLOG(INFO, "add index ok. table %s index cnt %d", name.c_str(), column_key.size());
if (task_info) {
task_info->set_status(::openmldb::api::TaskStatus::kDone);
Expand Down Expand Up @@ -9561,9 +9577,8 @@ void NameServerImpl::ShowProcedure(RpcController* controller, const api::ShowPro
}
}

std::shared_ptr<TabletInfo> NameServerImpl::GetTablet(const std::string& endpoint) {
std::shared_ptr<TabletInfo> NameServerImpl::GetTabletUnlock(const std::string& endpoint) {
std::shared_ptr<TabletInfo> tablet_ptr;
std::lock_guard<std::mutex> lock(mu_);
auto iter = tablets_.find(endpoint);
// check tablet if exist
if (iter == tablets_.end()) {
Expand All @@ -9577,6 +9592,11 @@ std::shared_ptr<TabletInfo> NameServerImpl::GetTablet(const std::string& endpoin
return tablet_ptr;
}

std::shared_ptr<TabletInfo> NameServerImpl::GetTablet(const std::string& endpoint) {
std::lock_guard<std::mutex> lock(mu_);
return GetTabletUnlock(endpoint);
}

void NameServerImpl::CreateDatabaseOrExit(const std::string& db) {
auto status = CreateDatabase(db, true);
if (!status.OK() && status.code != ::openmldb::base::ReturnCode::kDatabaseAlreadyExists) {
Expand Down
1 change: 1 addition & 0 deletions src/nameserver/name_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ class NameServerImpl : public NameServer {

void DropProcedureOnTablet(const std::string& db_name, const std::string& sp_name);

std::shared_ptr<TabletInfo> GetTabletUnlock(const std::string& endpoint);
std::shared_ptr<TabletInfo> GetTablet(const std::string& endpoint);

std::vector<std::shared_ptr<TabletInfo>> GetAllHealthTablet();
Expand Down
2 changes: 1 addition & 1 deletion src/test/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void ProcessSQLs(sdk::SQLRouter* sr, std::initializer_list<absl::string_view> sq

void ExpectResultSetStrEq(const std::vector<std::vector<CellExpectInfo>>& expect, hybridse::sdk::ResultSet* rs,
bool ordered) {
ASSERT_EQ(expect.size(), rs->Size() + 1);
ASSERT_EQ(expect.size(), static_cast<uint64_t>(rs->Size()) + 1);
size_t idx = 0;
// schema check
ASSERT_EQ(expect.front().size(), rs->GetSchema()->GetColumnCnt());
Expand Down

0 comments on commit eb14873

Please sign in to comment.