Skip to content

Commit

Permalink
chore: add sd tests (netdata#1406)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyam8 authored Oct 30, 2023
1 parent e4c7c15 commit 7d5b117
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 106 deletions.
103 changes: 0 additions & 103 deletions agent/discovery/sd/manager.go

This file was deleted.

3 changes: 0 additions & 3 deletions agent/discovery/sd/manager_test.go

This file was deleted.

106 changes: 106 additions & 0 deletions agent/discovery/sd/sd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// SPDX-License-Identifier: GPL-3.0-or-later

package sd

import (
"context"
"sync"

"github.com/netdata/go.d.plugin/agent/confgroup"
"github.com/netdata/go.d.plugin/agent/discovery/sd/pipeline"
"github.com/netdata/go.d.plugin/logger"

"gopkg.in/yaml.v2"
)

func NewServiceDiscovery() (*ServiceDiscovery, error) {
return nil, nil
}

type (
ServiceDiscovery struct {
*logger.Logger

confProv ConfigFileProvider
sdFactory sdPipelineFactory

confCache map[string]uint64
pipelines map[string]func()
}
sdPipeline interface {
Run(ctx context.Context, in chan<- []*confgroup.Group)
}
sdPipelineFactory interface {
create(config pipeline.Config) (sdPipeline, error)
}
)

func (d *ServiceDiscovery) Run(ctx context.Context, in chan<- []*confgroup.Group) {
d.Info("instance is started")
defer d.Info("instance is stopped")
defer d.cleanup()

var wg sync.WaitGroup

wg.Add(1)
go func() { defer wg.Done(); d.confProv.Run(ctx) }()

for {
select {
case <-ctx.Done():
return
case cf := <-d.confProv.Configs():
if cf.Source == "" {
continue
}
if len(cf.Data) == 0 {
delete(d.confCache, cf.Source)
d.removePipeline(cf)
} else if hash, ok := d.confCache[cf.Source]; !ok || hash != cf.Hash() {
d.confCache[cf.Source] = cf.Hash()
d.addPipeline(ctx, cf, in)
}
}
}
}

func (d *ServiceDiscovery) addPipeline(ctx context.Context, cf ConfigFile, in chan<- []*confgroup.Group) {
var cfg pipeline.Config

if err := yaml.Unmarshal(cf.Data, &cfg); err != nil {
d.Error(err)
return
}

pl, err := d.sdFactory.create(cfg)
if err != nil {
d.Error(err)
return
}

if stop, ok := d.pipelines[cf.Source]; ok {
stop()
}

var wg sync.WaitGroup
plCtx, cancel := context.WithCancel(ctx)

wg.Add(1)
go func() { defer wg.Done(); pl.Run(plCtx, in) }()
stop := func() { cancel(); wg.Wait() }

d.pipelines[cf.Source] = stop
}

func (d *ServiceDiscovery) removePipeline(cf ConfigFile) {
if stop, ok := d.pipelines[cf.Source]; ok {
delete(d.pipelines, cf.Source)
stop()
}
}

func (d *ServiceDiscovery) cleanup() {
for _, stop := range d.pipelines {
stop()
}
}
89 changes: 89 additions & 0 deletions agent/discovery/sd/sd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// SPDX-License-Identifier: GPL-3.0-or-later

package sd

import (
"testing"

"github.com/netdata/go.d.plugin/agent/discovery/sd/pipeline"

"gopkg.in/yaml.v2"
)

func TestServiceDiscovery_Run(t *testing.T) {
tests := map[string]discoverySim{
"add pipeline": {
configs: []ConfigFile{
prepareConfigFile("source", "name"),
},
wantPipelines: []*mockPipeline{
{name: "name", started: true, stopped: false},
},
},
"remove pipeline": {
configs: []ConfigFile{
prepareConfigFile("source", "name"),
prepareEmptyConfigFile("source"),
},
wantPipelines: []*mockPipeline{
{name: "name", started: true, stopped: true},
},
},
"re-add pipeline multiple times": {
configs: []ConfigFile{
prepareConfigFile("source", "name"),
prepareConfigFile("source", "name"),
prepareConfigFile("source", "name"),
},
wantPipelines: []*mockPipeline{
{name: "name", started: true, stopped: false},
},
},
"restart pipeline": {
configs: []ConfigFile{
prepareConfigFile("source", "name1"),
prepareConfigFile("source", "name2"),
},
wantPipelines: []*mockPipeline{
{name: "name1", started: true, stopped: true},
{name: "name2", started: true, stopped: false},
},
},
"invalid pipeline config": {
configs: []ConfigFile{
prepareConfigFile("source", "invalid"),
},
wantPipelines: nil,
},
"invalid config for running pipeline": {
configs: []ConfigFile{
prepareConfigFile("source", "name"),
prepareConfigFile("source", "invalid"),
},
wantPipelines: []*mockPipeline{
{name: "name", started: true, stopped: false},
},
},
}

for name, sim := range tests {
t.Run(name, func(t *testing.T) {
sim.run(t)
})
}
}

func prepareConfigFile(source, name string) ConfigFile {
bs, _ := yaml.Marshal(pipeline.Config{Name: name})

return ConfigFile{
Source: source,
Data: bs,
}
}

func prepareEmptyConfigFile(source string) ConfigFile {
return ConfigFile{
Source: source,
}
}
Loading

0 comments on commit 7d5b117

Please sign in to comment.