Skip to content

Commit

Permalink
dynamicconfig.MemoryClient supports subscriptions (#6562)
Browse files Browse the repository at this point in the history
## What changed?
Support subscriptions in dynamicconfig.MemoryClient

## Why?
Better more reliable tests

## How did you test it?
new unit test
  • Loading branch information
dnr authored Sep 27, 2024
1 parent 6d8baf9 commit 28d4815
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
36 changes: 35 additions & 1 deletion common/dynamicconfig/memory_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type (
MemoryClient struct {
lock sync.RWMutex
overrides []kvpair

subscriptionIdx int
subscriptions map[int]ClientUpdateFunc
}

kvpair struct {
Expand All @@ -44,13 +47,16 @@ type (

// NewMemoryClient - returns a memory based dynamic config client
func NewMemoryClient() *MemoryClient {
return &MemoryClient{}
return &MemoryClient{subscriptions: make(map[int]ClientUpdateFunc)}
}

func (d *MemoryClient) GetValue(key Key) []ConstrainedValue {
d.lock.RLock()
defer d.lock.RUnlock()
return d.getValueLocked(key)
}

func (d *MemoryClient) getValueLocked(key Key) []ConstrainedValue {
for i := len(d.overrides) - 1; i >= 0; i-- {
if d.overrides[i].valid && d.overrides[i].key == key {
v := d.overrides[i].value
Expand All @@ -63,6 +69,21 @@ func (d *MemoryClient) GetValue(key Key) []ConstrainedValue {
return nil
}

func (d *MemoryClient) Subscribe(f ClientUpdateFunc) (cancel func()) {
d.lock.Lock()
defer d.lock.Unlock()

d.subscriptionIdx++
id := d.subscriptionIdx
d.subscriptions[id] = f

return func() {
d.lock.Lock()
defer d.lock.Unlock()
delete(d.subscriptions, id)
}
}

func (d *MemoryClient) OverrideSetting(setting GenericSetting, value any) (cleanup func()) {
return d.OverrideValue(setting.Key(), value)
}
Expand All @@ -76,6 +97,12 @@ func (d *MemoryClient) OverrideValue(key Key, value any) (cleanup func()) {

d.overrides = append(d.overrides, kvpair{valid: true, key: key, value: value})

newValue := d.getValueLocked(key)
changed := map[Key][]ConstrainedValue{key: newValue}
for _, update := range d.subscriptions {
update(changed)
}

return func() {
// only do this once
if removeIdx := int(idx.Swap(-1)); removeIdx >= 0 {
Expand All @@ -88,11 +115,18 @@ func (d *MemoryClient) remove(idx int) {
d.lock.Lock()
defer d.lock.Unlock()

key := d.overrides[idx].key
// mark this pair deleted
d.overrides[idx] = kvpair{}

// pop all deleted pairs
for l := len(d.overrides); l > 0 && !d.overrides[l-1].valid; l = len(d.overrides) {
d.overrides = d.overrides[:l-1]
}

newValue := d.getValueLocked(key)
changed := map[Key][]ConstrainedValue{key: newValue}
for _, update := range d.subscriptions {
update(changed)
}
}
40 changes: 40 additions & 0 deletions common/dynamicconfig/memory_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,43 @@ func TestMemoryClient(t *testing.T) {
remove2() // no-op
assert.Nil(t, c.GetValue(k))
}

func TestMemoryClientSubscriptions(t *testing.T) {
c := dynamicconfig.NewMemoryClient()
k := dynamicconfig.Key("key")

calls := 0

c.Subscribe(func(changed map[dynamicconfig.Key][]dynamicconfig.ConstrainedValue) {
calls++
assert.Contains(t, changed, k)
switch calls {
case 1:
assert.Equal(t, []dynamicconfig.ConstrainedValue{{Value: 123}}, changed[k])
case 2:
assert.Nil(t, changed[k])
case 3:
assert.Equal(t, []dynamicconfig.ConstrainedValue{{Value: 456}}, changed[k])
case 4:
assert.Equal(t, []dynamicconfig.ConstrainedValue{{Value: 789}}, changed[k])
case 5:
assert.Equal(t, []dynamicconfig.ConstrainedValue{{Value: 456}}, changed[k])
case 6:
assert.Nil(t, changed[k])
}
})

remove := c.OverrideValue(k, 123)
assert.Equal(t, 1, calls)
remove()
assert.Equal(t, 2, calls)

remove1 := c.OverrideValue(k, 456)
assert.Equal(t, 3, calls)
remove2 := c.OverrideValue(k, 789)
assert.Equal(t, 4, calls)
remove2()
assert.Equal(t, 5, calls)
remove1()
assert.Equal(t, 6, calls)
}

0 comments on commit 28d4815

Please sign in to comment.