Skip to content

Commit

Permalink
refact: update delete interface
Browse files Browse the repository at this point in the history
  • Loading branch information
dl239 committed Jun 19, 2023
1 parent 3640df8 commit f8d589d
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 112 deletions.
43 changes: 18 additions & 25 deletions src/storage/aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ std::string AggrStatToString(AggrStat type) {

Aggregator::Aggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size)
: base_table_schema_(base_meta.column_desc()),
aggr_table_schema_(aggr_meta.column_desc()),
Expand Down Expand Up @@ -104,19 +104,11 @@ bool Aggregator::Update(const std::string& key, const std::string& row, uint64_t
}
auto row_ptr = reinterpret_cast<const int8_t*>(row.c_str());
int64_t cur_ts = 0;
switch (ts_col_type_) {
case DataType::kBigInt: {
base_row_view_.GetValue(row_ptr, ts_col_idx_, DataType::kBigInt, &cur_ts);
break;
}
case DataType::kTimestamp: {
base_row_view_.GetValue(row_ptr, ts_col_idx_, DataType::kTimestamp, &cur_ts);
break;
}
default: {
PDLOG(ERROR, "Unsupported timestamp data type");
return false;
}
if (ts_col_type_ == DataType::kBigInt || ts_col_type_ == DataType::kTimestamp) {
base_row_view_.GetValue(row_ptr, ts_col_idx_, ts_col_type_, &cur_ts);
} else {
PDLOG(ERROR, "Unsupported timestamp data type");
return false;
}
std::string filter_key = "";
if (filter_col_idx_ != -1) {
Expand Down Expand Up @@ -227,21 +219,22 @@ bool Aggregator::Delete(const std::string& key) {
dimension->set_idx(aggr_index_pos_);

// delete the entries from the pre-aggr table
bool ok = aggr_table_->Delete(entry);
if (!ok) {
if (!aggr_table_->Delete(entry)) {
PDLOG(ERROR, "Delete key %s from aggr table %s failed", key, aggr_table_->GetName());
return false;
}

ok = aggr_replicator_->AppendEntry(entry);
if (!ok) {
if (!aggr_replicator_->AppendEntry(entry)) {
PDLOG(ERROR, "Add Delete entry to binlog failed: key %s, aggr table %s", key, aggr_table_->GetName());
return false;
}
if (FLAGS_binlog_notify_on_put) {
aggr_replicator_->Notify();
}
return true;
}

bool Aggregator::Delete(const std::string& key, const std::optional<uint64_t>& start_ts,
const std::optional<uint64_t>& end_ts) {
return true;
}

Expand Down Expand Up @@ -588,7 +581,7 @@ bool Aggregator::CheckBufferFilled(int64_t cur_ts, int64_t buffer_end, int32_t b

SumAggregator::SumAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size)
: Aggregator(base_meta, aggr_meta, aggr_table, aggr_replicator, index_pos, aggr_col, aggr_type, ts_col, window_tpye,
window_size) {}
Expand Down Expand Up @@ -702,7 +695,7 @@ bool SumAggregator::DecodeAggrVal(const int8_t* row_ptr, AggrBuffer* buffer) {
MinMaxBaseAggregator::MinMaxBaseAggregator(const ::openmldb::api::TableMeta& base_meta,
const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table,
std::shared_ptr<LogReplicator> aggr_replicator, const uint32_t& index_pos,
std::shared_ptr<LogReplicator> aggr_replicator, uint32_t index_pos,
const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size)
: Aggregator(base_meta, aggr_meta, aggr_table, aggr_replicator, index_pos, aggr_col, aggr_type, ts_col, window_tpye,
Expand Down Expand Up @@ -808,7 +801,7 @@ bool MinMaxBaseAggregator::DecodeAggrVal(const int8_t* row_ptr, AggrBuffer* buff

MinAggregator::MinAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size)
: MinMaxBaseAggregator(base_meta, aggr_meta, aggr_table, aggr_replicator, index_pos, aggr_col, aggr_type, ts_col,
window_tpye, window_size) {}
Expand Down Expand Up @@ -890,7 +883,7 @@ bool MinAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t*

MaxAggregator::MaxAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size)
: MinMaxBaseAggregator(base_meta, aggr_meta, aggr_table, aggr_replicator, index_pos, aggr_col, aggr_type, ts_col,
window_tpye, window_size) {}
Expand Down Expand Up @@ -972,7 +965,7 @@ bool MaxAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t*

CountAggregator::CountAggregator(const ::openmldb::api::TableMeta& base_meta,
const ::openmldb::api::TableMeta& aggr_meta, std::shared_ptr<Table> aggr_table,
std::shared_ptr<LogReplicator> aggr_replicator, const uint32_t& index_pos,
std::shared_ptr<LogReplicator> aggr_replicator, uint32_t index_pos,
const std::string& aggr_col, const AggrType& aggr_type, const std::string& ts_col,
WindowType window_tpye, uint32_t window_size)
: Aggregator(base_meta, aggr_meta, aggr_table, aggr_replicator, index_pos, aggr_col, aggr_type, ts_col, window_tpye,
Expand Down Expand Up @@ -1007,7 +1000,7 @@ bool CountAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t

AvgAggregator::AvgAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size)
: Aggregator(base_meta, aggr_meta, aggr_table, aggr_replicator, index_pos, aggr_col, aggr_type, ts_col, window_tpye,
window_size) {}
Expand Down
16 changes: 9 additions & 7 deletions src/storage/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class Aggregator {
public:
Aggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);

~Aggregator();
Expand All @@ -131,6 +131,8 @@ class Aggregator {

bool Delete(const std::string& key);

bool Delete(const std::string& key, const std::optional<uint64_t>& start_ts, const std::optional<uint64_t>& end_ts);

bool FlushAll();

bool Init(std::shared_ptr<LogReplicator> base_replicator);
Expand Down Expand Up @@ -215,7 +217,7 @@ class SumAggregator : public Aggregator {
public:
SumAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);

~SumAggregator() = default;
Expand All @@ -232,7 +234,7 @@ class MinMaxBaseAggregator : public Aggregator {
public:
MinMaxBaseAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);

~MinMaxBaseAggregator() = default;
Expand All @@ -246,7 +248,7 @@ class MinAggregator : public MinMaxBaseAggregator {
public:
MinAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);

~MinAggregator() = default;
Expand All @@ -259,7 +261,7 @@ class MaxAggregator : public MinMaxBaseAggregator {
public:
MaxAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);

~MaxAggregator() = default;
Expand All @@ -272,7 +274,7 @@ class CountAggregator : public Aggregator {
public:
CountAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);

~CountAggregator() = default;
Expand All @@ -291,7 +293,7 @@ class AvgAggregator : public Aggregator {
public:
AvgAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);

~AvgAggregator() = default;
Expand Down
26 changes: 13 additions & 13 deletions src/storage/disk_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,17 +283,14 @@ bool DiskTable::Put(uint64_t time, const std::string& value, const Dimensions& d
}

bool DiskTable::Delete(const ::openmldb::api::LogEntry& entry) {
uint64_t start_ts = entry.has_ts() ? entry.ts() : UINT64_MAX;
std::optional<uint64_t> start_ts = entry.has_ts() ? std::optional<uint64_t>(entry.ts()) : std::nullopt;
std::optional<uint64_t> end_ts = entry.has_end_ts() ? std::optional<uint64_t>(entry.end_ts()) : std::nullopt;
if (entry.dimensions_size() > 0) {
for (const auto& dimension : entry.dimensions()) {
auto s = Delete(dimension.idx(), dimension.key(), start_ts, end_ts);
if (!s.OK()) {
DEBUGLOG("Delete failed. tid %u pid %u msg %s", id_, pid_, s.GetMsg().c_str());
if (!Delete(dimension.idx(), dimension.key(), start_ts, end_ts)) {
return false;
}
}
offset_.fetch_add(1, std::memory_order_relaxed);
return true;
} else {
for (const auto& index : table_index_.GetAllIndex()) {
Expand All @@ -312,12 +309,13 @@ bool DiskTable::Delete(const ::openmldb::api::LogEntry& entry) {
return true;
}

base::Status DiskTable::Delete(uint32_t idx, const std::string& pk,
uint64_t start_ts, const std::optional<uint64_t>& end_ts) {
bool DiskTable::Delete(uint32_t idx, const std::string& pk,
const std::optional<uint64_t>& start_ts, const std::optional<uint64_t>& end_ts) {
auto index_def = table_index_.GetIndex(idx);
if (!index_def || !index_def->IsReady()) {
return {-1, "index not found"};
return false;
}
uint64_t real_start_ts = start_ts.has_value() ? start_ts.value() : UINT64_MAX;
uint64_t real_end_ts = end_ts.has_value() ? end_ts.value() : 0;
std::string combine_key1;
std::string combine_key2;
Expand All @@ -327,22 +325,24 @@ base::Status DiskTable::Delete(uint32_t idx, const std::string& pk,
for (const auto& index : indexs) {
auto ts_col = index->GetTsColumn();
if (!ts_col) {
return {-1, "ts column not found"};
return false;
}
combine_key1 = CombineKeyTs(pk, start_ts, ts_col->GetId());
combine_key1 = CombineKeyTs(pk, real_start_ts, ts_col->GetId());
combine_key2 = CombineKeyTs(pk, real_end_ts, ts_col->GetId());
}
} else {
combine_key1 = CombineKeyTs(pk, start_ts);
combine_key1 = CombineKeyTs(pk, real_start_ts);
combine_key2 = CombineKeyTs(pk, real_end_ts);
}
rocksdb::WriteBatch batch;
batch.DeleteRange(cf_hs_[idx + 1], rocksdb::Slice(combine_key1), rocksdb::Slice(combine_key2));
rocksdb::Status s = db_->Write(write_opts_, &batch);
if (!s.ok()) {
return {-1, s.ToString()};
DEBUGLOG("Delete failed. tid %u pid %u msg %s", id_, pid_, s.ToString().c_str());
return false;
}
return {};
offset_.fetch_add(1, std::memory_order_relaxed);
return true;
}

bool DiskTable::Get(uint32_t idx, const std::string& pk, uint64_t ts, std::string& value) {
Expand Down
5 changes: 2 additions & 3 deletions src/storage/disk_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ class DiskTable : public Table {
bool Get(const std::string& pk, uint64_t ts, std::string& value); // NOLINT

bool Delete(const ::openmldb::api::LogEntry& entry) override;
bool Delete(uint32_t idx, const std::string& pk,
const std::optional<uint64_t>& start_ts, const std::optional<uint64_t>& end_ts) override;

uint64_t GetExpireTime(const TTLSt& ttl_st) override;

Expand Down Expand Up @@ -230,9 +232,6 @@ class DiskTable : public Table {

int GetCount(uint32_t index, const std::string& pk, uint64_t& count) override; // NOLINT

private:
base::Status Delete(uint32_t idx, const std::string& pk, uint64_t start_ts, const std::optional<uint64_t>& end_ts);

private:
rocksdb::DB* db_;
rocksdb::WriteOptions write_opts_;
Expand Down
Loading

0 comments on commit f8d589d

Please sign in to comment.