Skip to content

Commit

Permalink
add support of entity-default kafka quotas
Browse files Browse the repository at this point in the history
  • Loading branch information
azhurbilo committed Jun 12, 2022
1 parent b77f249 commit 135101e
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 10 deletions.
36 changes: 27 additions & 9 deletions kafka/kafka_quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,21 @@ func (c *Client) AlterQuota(quota Quota, validateOnly bool) error {
return err
}

entity := sarama.QuotaEntityComponent{
EntityType: sarama.QuotaEntityType(quota.EntityType),
MatchType: sarama.QuotaMatchExact,
Name: quota.EntityName,
}
var entity sarama.QuotaEntityComponent

if quota.EntityName == "" {
entity = sarama.QuotaEntityComponent{
EntityType: sarama.QuotaEntityType(quota.EntityType),
MatchType: sarama.QuotaMatchDefault,
}
} else {
entity = sarama.QuotaEntityComponent{
EntityType: sarama.QuotaEntityType(quota.EntityType),
MatchType: sarama.QuotaMatchExact,
Name: quota.EntityName,
}
}

configs := quota.Ops

ops := []sarama.ClientQuotasOp{}
Expand Down Expand Up @@ -92,10 +101,19 @@ func (c *Client) DescribeQuota(entityType string, entityName string) (*Quota, er
return nil, err
}

entity := sarama.QuotaFilterComponent{
EntityType: sarama.QuotaEntityType(entityType),
MatchType: sarama.QuotaMatchExact,
Match: entityName,
var entity sarama.QuotaFilterComponent

if entityName == "" {
entity = sarama.QuotaFilterComponent{
EntityType: sarama.QuotaEntityType(entityType),
MatchType: sarama.QuotaMatchDefault,
}
} else {
entity = sarama.QuotaFilterComponent{
EntityType: sarama.QuotaEntityType(entityType),
MatchType: sarama.QuotaMatchExact,
Match: entityName,
}
}

request := &sarama.DescribeClientQuotasRequest{
Expand Down
2 changes: 1 addition & 1 deletion kafka/resource_kafka_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func kafkaQuotaResource() *schema.Resource {
Type: schema.TypeString,
Required: true,
ForceNew: true,
Description: "The name of the entity",
Description: "The name of the entity (if entity_name is empty string, it will create default-entity Kafka quota)",
},
"entity_type": {
Type: schema.TypeString,
Expand Down
38 changes: 38 additions & 0 deletions kafka/resource_kafka_quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,44 @@ func TestAcc_QuotaConfigUpdate(t *testing.T) {
})
}

func TestAcc_DefaultEntityBasicQuota(t *testing.T) {
emptyEntityName := ""
bs := testBootstrapServers[0]

r.Test(t, r.TestCase{
ProviderFactories: overrideProviderFactory(),
PreCheck: func() { testAccPreCheck(t) },
CheckDestroy: testAccCheckQuotaDestroy,
Steps: []r.TestStep{
{
Config: cfgs(t, bs, fmt.Sprintf(testResourceQuota1, emptyEntityName, "4000000")),
Check: testResourceQuota_initialCheck,
},
},
})
}

func TestAcc_DefaultEntityQuotaConfigUpdate(t *testing.T) {
emptyEntityName := ""
bs := testBootstrapServers[0]

r.Test(t, r.TestCase{
ProviderFactories: overrideProviderFactory(),
PreCheck: func() { testAccPreCheck(t) },
CheckDestroy: testAccCheckQuotaDestroy,
Steps: []r.TestStep{
{
Config: cfg(t, bs, fmt.Sprintf(testResourceQuota1, emptyEntityName, "4000000")),
Check: testResourceQuota_initialCheck,
},
{
Config: cfg(t, bs, fmt.Sprintf(testResourceQuota1, emptyEntityName, "3000000")),
Check: testResourceQuota_updateCheck,
},
},
})
}

func testResourceQuota_initialCheck(s *terraform.State) error {
resourceState := s.Modules[0].Resources["kafka_quota.test1"]
if resourceState == nil {
Expand Down

0 comments on commit 135101e

Please sign in to comment.