diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index a26171cc45ba..8b4384b5f0eb 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -245,7 +245,7 @@ void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) { ); } -ui64 TPartition::MeteringDataSize() const { +ui64 TPartition::UserDataSize() const { if (DataKeysBody.size() <= 1) { // tiny optimization - we do not meter very small queues up to 16MB return 0; @@ -260,25 +260,39 @@ ui64 TPartition::MeteringDataSize() const { return size >= lastBlobSize ? size - lastBlobSize : 0; } +ui64 TPartition::MeteringDataSize(TInstant now) const { + if (IsActive() || NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS == Config.GetMeteringMode()) { + return UserDataSize(); + } else { + // We only add the amount of data that is blocked by an important consumer. + ui64 size = 0; + auto expirationTimestamp = now - TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds()) - WAKE_TIMEOUT; + for (size_t i = 1; i < DataKeysBody.size() && DataKeysBody[i].Timestamp < expirationTimestamp; ++i) { + size += DataKeysBody[i].Size; + } + return size; + } +} + ui64 TPartition::ReserveSize() const { - return TopicPartitionReserveSize(Config); + return IsActive() ? TopicPartitionReserveSize(Config) : 0; } ui64 TPartition::StorageSize(const TActorContext&) const { - return std::max(MeteringDataSize(), ReserveSize()); + return std::max(UserDataSize(), ReserveSize()); } ui64 TPartition::UsedReserveSize(const TActorContext&) const { - return std::min(MeteringDataSize(), ReserveSize()); + return std::min(UserDataSize(), ReserveSize()); } ui64 TPartition::GetUsedStorage(const TInstant& now) { const auto duration = now - LastUsedStorageMeterTimestamp; LastUsedStorageMeterTimestamp = now; - auto dataSize = MeteringDataSize(); + auto dataSize = MeteringDataSize(now); auto reservedSize = ReserveSize(); - ui64 size = dataSize > reservedSize ? dataSize - reservedSize : 0; + auto size = dataSize > reservedSize ? dataSize - reservedSize : 0; return size * duration.MilliSeconds() / 1000 / 1_MB; // mb*seconds } @@ -749,7 +763,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext result.SetReadBytesQuota(maxQuota); - result.SetPartitionSize(MeteringDataSize()); + result.SetPartitionSize(UserDataSize()); result.SetUsedReserveSize(UsedReserveSize(ctx)); result.SetLastWriteTimestampMs(WriteTimestamp.MilliSeconds()); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 8be4308842af..e45dcd6def1a 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -457,7 +457,9 @@ class TPartition : public TActorBootstrapped { } // The size of the data realy was persisted in the storage by the partition - ui64 MeteringDataSize() const; + ui64 UserDataSize() const; + // The size of the data was metered to user + ui64 MeteringDataSize(TInstant now) const; // The size of the storage that was reserved by the partition ui64 ReserveSize() const; // The size of the storage that usud by the partition. That included combination of the reserver and realy persisted data. @@ -951,4 +953,3 @@ class TPartition : public TActorBootstrapped { }; } // namespace NKikimr::NPQ - diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index e0346e2f9010..2cb1f0664618 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1764,7 +1764,7 @@ bool TPartition::WaitingForSubDomainQuota(const TActorContext& /*ctx*/, const ui return withSize > 0 || Size() > 0; } - return MeteringDataSize() + withSize > ReserveSize(); + return UserDataSize() + withSize > ReserveSize(); } void TPartition::RequestBlobQuota(size_t quotaSize) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 52a59d047da5..f86b619bbaf8 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1171,7 +1171,6 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) { return result; }; - MeteringSink.Create(ctx.Now(), { .FlushInterval = TDuration::Seconds(pqConfig.GetBillingMeteringConfig().GetFlushIntervalSec()), .TabletId = ToString(TabletID()), @@ -1180,7 +1179,7 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) { .YdbDatabaseId = Config.GetYdbDatabaseId(), .StreamName = streamName, .ResourceId = streamPath, - .PartitionsSize = Config.PartitionsSize(), + .PartitionsSize = CountActivePartitions(Config.GetPartitions()), .WriteQuota = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), .ReservedSpace = storageLimitBytes, .ConsumersCount = countReadRulesWithPricing(ctx, Config), diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 7d4ad4698cbe..828e20cfbec0 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -36,6 +36,12 @@ bool SplitMergeEnabled(const NKikimrPQ::TPQTabletConfig& config) { return config.has_partitionstrategy() && config.partitionstrategy().has_partitionstrategytype() && config.partitionstrategy().partitionstrategytype() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED; } +size_t CountActivePartitions(const ::google::protobuf::RepeatedPtrField< ::NKikimrPQ::TPQTabletConfig_TPartition >& partitions) { + return std::count_if(partitions.begin(), partitions.end(), [](const auto& p) { + return p.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active; + }); +} + static constexpr ui64 PUT_UNIT_SIZE = 40960u; // 40Kb ui64 PutUnitsSize(const ui64 size) { diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index 55097d10c7d2..27373a851831 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -11,6 +11,8 @@ ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config); bool SplitMergeEnabled(const NKikimrPQ::TPQTabletConfig& config); +size_t CountActivePartitions(const ::google::protobuf::RepeatedPtrField< ::NKikimrPQ::TPQTabletConfig_TPartition >& partitions); + ui64 PutUnitsSize(const ui64 size); TString SourceIdHash(const TString& sourceId); diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 0b77b71a9656..3082db929199 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -2475,8 +2475,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase { Y_ABORT_UNLESS(it != Self->Topics.end()); Y_ABORT_UNLESS(it->second); TTopicInfo::TPtr pqGroup = it->second; - if (pqInfo->AlterVersion <= pqGroup->AlterVersion) + if (pqInfo->AlterVersion <= pqGroup->AlterVersion) { ++pqGroup->TotalPartitionCount; + if (pqInfo->Status == NKikimrPQ::ETopicPartitionStatus::Active) { + ++pqGroup->ActivePartitionCount; + } + } if (pqInfo->PqId >= pqGroup->NextPartitionId) { pqGroup->NextPartitionId = pqInfo->PqId + 1; pqGroup->TotalGroupCount = pqInfo->PqId + 1; @@ -4058,7 +4062,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase { if (path->IsPQGroup()) { auto pqGroup = Self->Topics.at(path->PathId); - auto delta = pqGroup->AlterData ? pqGroup->AlterData->TotalPartitionCount : pqGroup->TotalPartitionCount; + auto partitionDelta = pqGroup->AlterData ? pqGroup->AlterData->TotalPartitionCount : pqGroup->TotalPartitionCount; + auto activePartitionDelta = pqGroup->AlterData ? pqGroup->AlterData->ActivePartitionCount : pqGroup->ActivePartitionCount; + auto tabletConfig = pqGroup->AlterData ? (pqGroup->AlterData->TabletConfig.empty() ? pqGroup->TabletConfig : pqGroup->AlterData->TabletConfig) : pqGroup->TabletConfig; NKikimrPQ::TPQTabletConfig config; @@ -4066,12 +4072,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase { bool parseOk = ParseFromStringNoSizeLimit(config, tabletConfig); Y_ABORT_UNLESS(parseOk); - const PQGroupReserve reserve(config, delta); + const PQGroupReserve reserve(config, activePartitionDelta); - inclusiveDomainInfo->IncPQPartitionsInside(delta); + inclusiveDomainInfo->IncPQPartitionsInside(partitionDelta); inclusiveDomainInfo->IncPQReservedStorage(reserve.Storage); - Self->TabletCounters->Simple()[COUNTER_STREAM_SHARDS_COUNT].Add(delta); + Self->TabletCounters->Simple()[COUNTER_STREAM_SHARDS_COUNT].Add(partitionDelta); Self->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Add(reserve.Throughput); Self->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Add(reserve.Storage); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index dac4b58742ce..1c28386c9b82 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -577,6 +577,8 @@ class TAlterPQ: public TSubOperation { return result; } + alterData->ActivePartitionCount = topic->ActivePartitionCount; + bool splitMergeEnabled = AppData()->FeatureFlags.GetEnableTopicSplitMerge() && NKikimr::NPQ::SplitMergeEnabled(tabletConfig) && NKikimr::NPQ::SplitMergeEnabled(newTabletConfig); @@ -593,6 +595,7 @@ class TAlterPQ: public TSubOperation { for (const auto& split : alter.GetSplit()) { alterData->TotalGroupCount += 2; + ++alterData->ActivePartitionCount; const auto splittedPartitionId = split.GetPartition(); if (!topic->Partitions.contains(splittedPartitionId)) { @@ -654,6 +657,7 @@ class TAlterPQ: public TSubOperation { } for (const auto& merge : alter.GetMerge()) { alterData->TotalGroupCount += 1; + --alterData->ActivePartitionCount; const auto partitionId = merge.GetPartition(); if (!topic->Partitions.contains(partitionId)) { @@ -737,6 +741,10 @@ class TAlterPQ: public TSubOperation { } alterData->TotalPartitionCount = topic->TotalPartitionCount + alterData->PartitionsToAdd.size(); + if (!splitMergeEnabled) { + alterData->ActivePartitionCount = alterData->TotalPartitionCount; + } + alterData->NextPartitionId = topic->NextPartitionId; for (const auto& p : alterData->PartitionsToAdd) { if (p.GroupId == 0 || p.GroupId > alterData->TotalGroupCount) { @@ -780,8 +788,8 @@ class TAlterPQ: public TSubOperation { return result; } - const PQGroupReserve reserve(newTabletConfig, alterData->TotalPartitionCount); - const PQGroupReserve oldReserve(tabletConfig, topic->TotalPartitionCount); + const PQGroupReserve reserve(newTabletConfig, alterData->ActivePartitionCount); + const PQGroupReserve oldReserve(tabletConfig, topic->ActivePartitionCount); const ui64 storageToReserve = reserve.Storage > oldReserve.Storage ? reserve.Storage - oldReserve.Storage : 0; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp index a83c459588bd..4cd1303a1fa3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp @@ -155,6 +155,7 @@ TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context, pqGroupInfo->TotalGroupCount = partitionCount; pqGroupInfo->TotalPartitionCount = partitionCount; + pqGroupInfo->ActivePartitionCount = partitionCount; ui32 tabletCount = pqGroupInfo->ExpectedShardCount(); if (tabletCount > TSchemeShard::MaxPQGroupTabletsCount) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp index 70d663c713eb..5e4f9ca67177 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp @@ -183,7 +183,7 @@ class TPropose: public TSubOperationState { bool parseOk = ParseFromStringNoSizeLimit(config, tabletConfig); Y_ABORT_UNLESS(parseOk); - const PQGroupReserve reserve(config, pqGroup->TotalPartitionCount); + const PQGroupReserve reserve(config, pqGroup->ActivePartitionCount); auto domainInfo = context.SS->ResolveDomainInfo(pathId); domainInfo->DecPathsInside(); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 938107d12c6b..4eb70beac9c8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -1126,6 +1126,7 @@ struct TTopicInfo : TSimpleRefCount { TTabletId BalancerTabletID = InvalidTabletId; TShardIdx BalancerShardIdx = InvalidShardIdx; THashMap Partitions; + size_t ActivePartitionCount = 0; TString PreSerializedPathDescription; // Cached path description TString PreSerializedPartitionsDescription; // Cached partition description @@ -1222,6 +1223,7 @@ struct TTopicInfo : TSimpleRefCount { alterData->AlterVersion = AlterVersion + 1; Y_ABORT_UNLESS(alterData->TotalGroupCount); Y_ABORT_UNLESS(alterData->TotalPartitionCount); + Y_ABORT_UNLESS(0 < alterData->ActivePartitionCount && alterData->ActivePartitionCount <= alterData->TotalPartitionCount); Y_ABORT_UNLESS(alterData->NextPartitionId); Y_ABORT_UNLESS(alterData->MaxPartsPerTablet); alterData->KeySchema = KeySchema; @@ -1235,6 +1237,7 @@ struct TTopicInfo : TSimpleRefCount { TotalGroupCount = AlterData->TotalGroupCount; NextPartitionId = AlterData->NextPartitionId; TotalPartitionCount = AlterData->TotalPartitionCount; + ActivePartitionCount = AlterData->ActivePartitionCount; MaxPartsPerTablet = AlterData->MaxPartsPerTablet; if (!AlterData->TabletConfig.empty()) TabletConfig = std::move(AlterData->TabletConfig); @@ -3275,7 +3278,7 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl, const THashMap& colName2Id, const TSubDomainInfo& subDomain, TString& errStr); -std::optional> ValidateSequenceType(const TString& sequenceName, const TString& dataType, +std::optional> ValidateSequenceType(const TString& sequenceName, const TString& dataType, const NKikimr::NScheme::TTypeRegistry& typeRegistry, bool pgTypesEnabled, TString& errStr); } diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index 5e85a674c428..5ce0598b4342 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -10125,7 +10125,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { }, 100); UNIT_ASSERT_VALUES_EQUAL(splitKey, "(Uint64 : 0, Utf8 : a, Uint32 : 1)"); } - + { TString splitKey = schemaHelper.FindSplitKey({ @@ -11072,6 +11072,74 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { AssertReserve("/MyRoot/Topic2", 3 * 17); } + Y_UNIT_TEST(TopicWithAutopartitioningReserveSize) { + TTestEnvOptions opts; + opts.EnableTopicSplitMerge(true); + opts.EnablePQConfigTransactionsAtSchemeShard(true); + + TTestBasicRuntime runtime; + + TTestEnv env(runtime, opts); + ui64 txId = 100; + + const auto AssertReserve = [&] (TString path, ui64 expectedReservedStorage) { + TestDescribeResult(DescribePath(runtime, path), + {NLs::Finished, + NLs::TopicReservedStorage(expectedReservedStorage)}); + }; + + // create with WriteSpeedInBytesPerSecond + TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic1" + TotalGroupCount: 1 + PartitionPerTablet: 1 + PQTabletConfig { + PartitionConfig { + LifetimeSeconds: 13 + WriteSpeedInBytesPerSecond : 19 + } + MeteringMode: METERING_MODE_RESERVED_CAPACITY + PartitionStrategy { + MinPartitionCount: 1 + MaxPartitionCount: 7 + PartitionStrategyType: CAN_SPLIT_AND_MERGE + } + } + )"); + env.TestWaitNotification(runtime, txId); + AssertReserve("/MyRoot/Topic1", 1 * 13 * 19); + + TestAlterPQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic1" + Split { + Partition: 0 + SplitBoundary: 'A' + } + )"); + env.TestWaitNotification(runtime, txId); + AssertReserve("/MyRoot/Topic1", 2 * 13 * 19); // There are only 2 active partitions + + TestAlterPQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic1" + Split { + Partition: 1 + SplitBoundary: '0' + } + )"); + env.TestWaitNotification(runtime, txId); + AssertReserve("/MyRoot/Topic1", 3 * 13 * 19); // There are only 3 active partitions + + TestAlterPQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic1" + Merge { + Partition: 2 + AdjacentPartition: 4 + } + )"); + env.TestWaitNotification(runtime, txId); + AssertReserve("/MyRoot/Topic1", 2 * 13 * 19); // There are only 2 active partitions + } + Y_UNIT_TEST(FindSubDomainPathId) { TTestBasicRuntime runtime; TTestEnv env(runtime);