Skip to content

Commit

Permalink
Metering for inactive partitions (#7199)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Jul 31, 2024
1 parent a0de26d commit 6f27c67
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 22 deletions.
28 changes: 21 additions & 7 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) {
);
}

ui64 TPartition::MeteringDataSize() const {
ui64 TPartition::UserDataSize() const {
if (DataKeysBody.size() <= 1) {
// tiny optimization - we do not meter very small queues up to 16MB
return 0;
Expand All @@ -260,25 +260,39 @@ ui64 TPartition::MeteringDataSize() const {
return size >= lastBlobSize ? size - lastBlobSize : 0;
}

ui64 TPartition::MeteringDataSize(TInstant now) const {
if (IsActive() || NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS == Config.GetMeteringMode()) {
return UserDataSize();
} else {
// We only add the amount of data that is blocked by an important consumer.
ui64 size = 0;
auto expirationTimestamp = now - TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds()) - WAKE_TIMEOUT;
for (size_t i = 1; i < DataKeysBody.size() && DataKeysBody[i].Timestamp < expirationTimestamp; ++i) {
size += DataKeysBody[i].Size;
}
return size;
}
}

ui64 TPartition::ReserveSize() const {
return TopicPartitionReserveSize(Config);
return IsActive() ? TopicPartitionReserveSize(Config) : 0;
}

ui64 TPartition::StorageSize(const TActorContext&) const {
return std::max<ui64>(MeteringDataSize(), ReserveSize());
return std::max<ui64>(UserDataSize(), ReserveSize());
}

ui64 TPartition::UsedReserveSize(const TActorContext&) const {
return std::min<ui64>(MeteringDataSize(), ReserveSize());
return std::min<ui64>(UserDataSize(), ReserveSize());
}

ui64 TPartition::GetUsedStorage(const TInstant& now) {
const auto duration = now - LastUsedStorageMeterTimestamp;
LastUsedStorageMeterTimestamp = now;

auto dataSize = MeteringDataSize();
auto dataSize = MeteringDataSize(now);
auto reservedSize = ReserveSize();
ui64 size = dataSize > reservedSize ? dataSize - reservedSize : 0;
auto size = dataSize > reservedSize ? dataSize - reservedSize : 0;
return size * duration.MilliSeconds() / 1000 / 1_MB; // mb*seconds
}

Expand Down Expand Up @@ -749,7 +763,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext

result.SetReadBytesQuota(maxQuota);

result.SetPartitionSize(MeteringDataSize());
result.SetPartitionSize(UserDataSize());
result.SetUsedReserveSize(UsedReserveSize(ctx));

result.SetLastWriteTimestampMs(WriteTimestamp.MilliSeconds());
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
}

// The size of the data realy was persisted in the storage by the partition
ui64 MeteringDataSize() const;
ui64 UserDataSize() const;
// The size of the data was metered to user
ui64 MeteringDataSize(TInstant now) const;
// The size of the storage that was reserved by the partition
ui64 ReserveSize() const;
// The size of the storage that usud by the partition. That included combination of the reserver and realy persisted data.
Expand Down Expand Up @@ -951,4 +953,3 @@ class TPartition : public TActorBootstrapped<TPartition> {
};

} // namespace NKikimr::NPQ

2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1764,7 +1764,7 @@ bool TPartition::WaitingForSubDomainQuota(const TActorContext& /*ctx*/, const ui
return withSize > 0 || Size() > 0;
}

return MeteringDataSize() + withSize > ReserveSize();
return UserDataSize() + withSize > ReserveSize();
}

void TPartition::RequestBlobQuota(size_t quotaSize)
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,6 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) {
return result;
};


MeteringSink.Create(ctx.Now(), {
.FlushInterval = TDuration::Seconds(pqConfig.GetBillingMeteringConfig().GetFlushIntervalSec()),
.TabletId = ToString(TabletID()),
Expand All @@ -1180,7 +1179,7 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) {
.YdbDatabaseId = Config.GetYdbDatabaseId(),
.StreamName = streamName,
.ResourceId = streamPath,
.PartitionsSize = Config.PartitionsSize(),
.PartitionsSize = CountActivePartitions(Config.GetPartitions()),
.WriteQuota = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(),
.ReservedSpace = storageLimitBytes,
.ConsumersCount = countReadRulesWithPricing(ctx, Config),
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/persqueue/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ bool SplitMergeEnabled(const NKikimrPQ::TPQTabletConfig& config) {
return config.has_partitionstrategy() && config.partitionstrategy().has_partitionstrategytype() && config.partitionstrategy().partitionstrategytype() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED;
}

size_t CountActivePartitions(const ::google::protobuf::RepeatedPtrField< ::NKikimrPQ::TPQTabletConfig_TPartition >& partitions) {
return std::count_if(partitions.begin(), partitions.end(), [](const auto& p) {
return p.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active;
});
}

static constexpr ui64 PUT_UNIT_SIZE = 40960u; // 40Kb

ui64 PutUnitsSize(const ui64 size) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config);

bool SplitMergeEnabled(const NKikimrPQ::TPQTabletConfig& config);

size_t CountActivePartitions(const ::google::protobuf::RepeatedPtrField< ::NKikimrPQ::TPQTabletConfig_TPartition >& partitions);

ui64 PutUnitsSize(const ui64 size);

TString SourceIdHash(const TString& sourceId);
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2475,8 +2475,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
Y_ABORT_UNLESS(it != Self->Topics.end());
Y_ABORT_UNLESS(it->second);
TTopicInfo::TPtr pqGroup = it->second;
if (pqInfo->AlterVersion <= pqGroup->AlterVersion)
if (pqInfo->AlterVersion <= pqGroup->AlterVersion) {
++pqGroup->TotalPartitionCount;
if (pqInfo->Status == NKikimrPQ::ETopicPartitionStatus::Active) {
++pqGroup->ActivePartitionCount;
}
}
if (pqInfo->PqId >= pqGroup->NextPartitionId) {
pqGroup->NextPartitionId = pqInfo->PqId + 1;
pqGroup->TotalGroupCount = pqInfo->PqId + 1;
Expand Down Expand Up @@ -4058,20 +4062,22 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {

if (path->IsPQGroup()) {
auto pqGroup = Self->Topics.at(path->PathId);
auto delta = pqGroup->AlterData ? pqGroup->AlterData->TotalPartitionCount : pqGroup->TotalPartitionCount;
auto partitionDelta = pqGroup->AlterData ? pqGroup->AlterData->TotalPartitionCount : pqGroup->TotalPartitionCount;
auto activePartitionDelta = pqGroup->AlterData ? pqGroup->AlterData->ActivePartitionCount : pqGroup->ActivePartitionCount;

auto tabletConfig = pqGroup->AlterData ? (pqGroup->AlterData->TabletConfig.empty() ? pqGroup->TabletConfig : pqGroup->AlterData->TabletConfig)
: pqGroup->TabletConfig;
NKikimrPQ::TPQTabletConfig config;
Y_ABORT_UNLESS(!tabletConfig.empty());
bool parseOk = ParseFromStringNoSizeLimit(config, tabletConfig);
Y_ABORT_UNLESS(parseOk);

const PQGroupReserve reserve(config, delta);
const PQGroupReserve reserve(config, activePartitionDelta);

inclusiveDomainInfo->IncPQPartitionsInside(delta);
inclusiveDomainInfo->IncPQPartitionsInside(partitionDelta);
inclusiveDomainInfo->IncPQReservedStorage(reserve.Storage);

Self->TabletCounters->Simple()[COUNTER_STREAM_SHARDS_COUNT].Add(delta);
Self->TabletCounters->Simple()[COUNTER_STREAM_SHARDS_COUNT].Add(partitionDelta);
Self->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Add(reserve.Throughput);
Self->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Add(reserve.Storage);
}
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ class TAlterPQ: public TSubOperation {
return result;
}

alterData->ActivePartitionCount = topic->ActivePartitionCount;

bool splitMergeEnabled = AppData()->FeatureFlags.GetEnableTopicSplitMerge()
&& NKikimr::NPQ::SplitMergeEnabled(tabletConfig)
&& NKikimr::NPQ::SplitMergeEnabled(newTabletConfig);
Expand All @@ -593,6 +595,7 @@ class TAlterPQ: public TSubOperation {

for (const auto& split : alter.GetSplit()) {
alterData->TotalGroupCount += 2;
++alterData->ActivePartitionCount;

const auto splittedPartitionId = split.GetPartition();
if (!topic->Partitions.contains(splittedPartitionId)) {
Expand Down Expand Up @@ -654,6 +657,7 @@ class TAlterPQ: public TSubOperation {
}
for (const auto& merge : alter.GetMerge()) {
alterData->TotalGroupCount += 1;
--alterData->ActivePartitionCount;

const auto partitionId = merge.GetPartition();
if (!topic->Partitions.contains(partitionId)) {
Expand Down Expand Up @@ -737,6 +741,10 @@ class TAlterPQ: public TSubOperation {
}

alterData->TotalPartitionCount = topic->TotalPartitionCount + alterData->PartitionsToAdd.size();
if (!splitMergeEnabled) {
alterData->ActivePartitionCount = alterData->TotalPartitionCount;
}

alterData->NextPartitionId = topic->NextPartitionId;
for (const auto& p : alterData->PartitionsToAdd) {
if (p.GroupId == 0 || p.GroupId > alterData->TotalGroupCount) {
Expand Down Expand Up @@ -780,8 +788,8 @@ class TAlterPQ: public TSubOperation {
return result;
}

const PQGroupReserve reserve(newTabletConfig, alterData->TotalPartitionCount);
const PQGroupReserve oldReserve(tabletConfig, topic->TotalPartitionCount);
const PQGroupReserve reserve(newTabletConfig, alterData->ActivePartitionCount);
const PQGroupReserve oldReserve(tabletConfig, topic->ActivePartitionCount);

const ui64 storageToReserve = reserve.Storage > oldReserve.Storage ? reserve.Storage - oldReserve.Storage : 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context,

pqGroupInfo->TotalGroupCount = partitionCount;
pqGroupInfo->TotalPartitionCount = partitionCount;
pqGroupInfo->ActivePartitionCount = partitionCount;

ui32 tabletCount = pqGroupInfo->ExpectedShardCount();
if (tabletCount > TSchemeShard::MaxPQGroupTabletsCount) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class TPropose: public TSubOperationState {
bool parseOk = ParseFromStringNoSizeLimit(config, tabletConfig);
Y_ABORT_UNLESS(parseOk);

const PQGroupReserve reserve(config, pqGroup->TotalPartitionCount);
const PQGroupReserve reserve(config, pqGroup->ActivePartitionCount);

auto domainInfo = context.SS->ResolveDomainInfo(pathId);
domainInfo->DecPathsInside();
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ struct TTopicInfo : TSimpleRefCount<TTopicInfo> {
TTabletId BalancerTabletID = InvalidTabletId;
TShardIdx BalancerShardIdx = InvalidShardIdx;
THashMap<ui32, TTopicTabletInfo::TTopicPartitionInfo*> Partitions;
size_t ActivePartitionCount = 0;

TString PreSerializedPathDescription; // Cached path description
TString PreSerializedPartitionsDescription; // Cached partition description
Expand Down Expand Up @@ -1222,6 +1223,7 @@ struct TTopicInfo : TSimpleRefCount<TTopicInfo> {
alterData->AlterVersion = AlterVersion + 1;
Y_ABORT_UNLESS(alterData->TotalGroupCount);
Y_ABORT_UNLESS(alterData->TotalPartitionCount);
Y_ABORT_UNLESS(0 < alterData->ActivePartitionCount && alterData->ActivePartitionCount <= alterData->TotalPartitionCount);
Y_ABORT_UNLESS(alterData->NextPartitionId);
Y_ABORT_UNLESS(alterData->MaxPartsPerTablet);
alterData->KeySchema = KeySchema;
Expand All @@ -1235,6 +1237,7 @@ struct TTopicInfo : TSimpleRefCount<TTopicInfo> {
TotalGroupCount = AlterData->TotalGroupCount;
NextPartitionId = AlterData->NextPartitionId;
TotalPartitionCount = AlterData->TotalPartitionCount;
ActivePartitionCount = AlterData->ActivePartitionCount;
MaxPartsPerTablet = AlterData->MaxPartsPerTablet;
if (!AlterData->TabletConfig.empty())
TabletConfig = std::move(AlterData->TabletConfig);
Expand Down Expand Up @@ -3275,7 +3278,7 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl,
const THashMap<TString, ui32>& colName2Id,
const TSubDomainInfo& subDomain, TString& errStr);

std::optional<std::pair<i64, i64>> ValidateSequenceType(const TString& sequenceName, const TString& dataType,
std::optional<std::pair<i64, i64>> ValidateSequenceType(const TString& sequenceName, const TString& dataType,
const NKikimr::NScheme::TTypeRegistry& typeRegistry, bool pgTypesEnabled, TString& errStr);

}
Expand Down
70 changes: 69 additions & 1 deletion ydb/core/tx/schemeshard/ut_base/ut_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10125,7 +10125,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
}, 100);
UNIT_ASSERT_VALUES_EQUAL(splitKey, "(Uint64 : 0, Utf8 : a, Uint32 : 1)");
}

{
TString splitKey =
schemaHelper.FindSplitKey({
Expand Down Expand Up @@ -11072,6 +11072,74 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
AssertReserve("/MyRoot/Topic2", 3 * 17);
}

Y_UNIT_TEST(TopicWithAutopartitioningReserveSize) {
TTestEnvOptions opts;
opts.EnableTopicSplitMerge(true);
opts.EnablePQConfigTransactionsAtSchemeShard(true);

TTestBasicRuntime runtime;

TTestEnv env(runtime, opts);
ui64 txId = 100;

const auto AssertReserve = [&] (TString path, ui64 expectedReservedStorage) {
TestDescribeResult(DescribePath(runtime, path),
{NLs::Finished,
NLs::TopicReservedStorage(expectedReservedStorage)});
};

// create with WriteSpeedInBytesPerSecond
TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
Name: "Topic1"
TotalGroupCount: 1
PartitionPerTablet: 1
PQTabletConfig {
PartitionConfig {
LifetimeSeconds: 13
WriteSpeedInBytesPerSecond : 19
}
MeteringMode: METERING_MODE_RESERVED_CAPACITY
PartitionStrategy {
MinPartitionCount: 1
MaxPartitionCount: 7
PartitionStrategyType: CAN_SPLIT_AND_MERGE
}
}
)");
env.TestWaitNotification(runtime, txId);
AssertReserve("/MyRoot/Topic1", 1 * 13 * 19);

TestAlterPQGroup(runtime, ++txId, "/MyRoot", R"(
Name: "Topic1"
Split {
Partition: 0
SplitBoundary: 'A'
}
)");
env.TestWaitNotification(runtime, txId);
AssertReserve("/MyRoot/Topic1", 2 * 13 * 19); // There are only 2 active partitions

TestAlterPQGroup(runtime, ++txId, "/MyRoot", R"(
Name: "Topic1"
Split {
Partition: 1
SplitBoundary: '0'
}
)");
env.TestWaitNotification(runtime, txId);
AssertReserve("/MyRoot/Topic1", 3 * 13 * 19); // There are only 3 active partitions

TestAlterPQGroup(runtime, ++txId, "/MyRoot", R"(
Name: "Topic1"
Merge {
Partition: 2
AdjacentPartition: 4
}
)");
env.TestWaitNotification(runtime, txId);
AssertReserve("/MyRoot/Topic1", 2 * 13 * 19); // There are only 2 active partitions
}

Y_UNIT_TEST(FindSubDomainPathId) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
Expand Down

0 comments on commit 6f27c67

Please sign in to comment.