Skip to content

Commit

Permalink
feat(cascadingprocessor): add collector instances configuration (#1358)
Browse files Browse the repository at this point in the history
* feat(cascadingfilter): introduce collector_instances option
* feat(cascadingfilter): update default cfg
* feat(cascadingfilter): calculate limits using collector instances
* chore(cascadingfilter): update tests
* chore(cascadingfilter): update log msg
  • Loading branch information
mat-rumian committed Dec 19, 2023
1 parent d1281de commit 3223aa8
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 27 deletions.
1 change: 1 addition & 0 deletions .changelog/1358.added.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat(cascadingfilter): add collector_instances config option for spans_per_second global and policy limits scaling
4 changes: 4 additions & 0 deletions pkg/processor/cascadingfilterprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The following configuration options should be configured as desired:

The following configuration options can also be modified:

- `collector_instances` (default = 1): In case of multiple deployments **sharing single configuration** of the `cascadingfilter`, should be used to scale down properly `spans_per_second` global and policy limits. Value should be positive integer corresponding to the number of collectors with configured cascadingfilters e.g. `collector_instances=5`. As a result configured `spans_per_second` limit will be divided by `5` for global and policy limits.
- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a filtering decision
- `num_traces` (default = 100000): Max number of traces for which decisions are kept in memory
- `history_size` (default = `num_traces` value): Max size of LRU cache used for storing decisions on already processed traces
Expand All @@ -28,6 +29,9 @@ The following configuration options can also be modified:

Whenever rate limiting is applied, only full traces are accepted (if trace won't fit within the limit, it will never be filtered). For spans that are arriving late, previous decision are kept for some time.

In case of multiple deployments **sharing single conifugration file** of the `cascadingfilter`, environment variable called `SUMO_COLLECTOR_INSTANCES` should be used to scale down properly `spans_per_second` global and policy limits. `SUMO_COLLECTOR_INSTANCES` should be positive integer corresponding to the number of collectors with configured cascadingfilters e.g. `SUMO_COLLECTOR_INSTANCES=5`.
As a result configured `spans_per_second` limit will be divided by `5` for global and policy limits.

## Updated span attributes

The processor modifies each span attributes, by setting following two attributes:
Expand Down
7 changes: 5 additions & 2 deletions pkg/processor/cascadingfilterprocessor/cascade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var testValue = 10 * time.Millisecond
var probabilisticFilteringRate = int32(10)
var healthCheckPattern = "health"
var cfg = cfconfig.Config{
CollectorInstances: 1,
DecisionWait: 2 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 100,
Expand All @@ -60,8 +61,9 @@ var cfg = cfconfig.Config{
}

var cfgJustDropping = cfconfig.Config{
DecisionWait: 2 * time.Second,
NumTraces: 100,
CollectorInstances: 1,
DecisionWait: 2 * time.Second,
NumTraces: 100,
TraceRejectCfgs: []cfconfig.TraceRejectCfg{
{
Name: "health-check",
Expand All @@ -71,6 +73,7 @@ var cfgJustDropping = cfconfig.Config{
}

var cfgAutoRate = cfconfig.Config{
CollectorInstances: 1,
DecisionWait: 2 * time.Second,
ProbabilisticFilteringRate: &probabilisticFilteringRate,
NumTraces: 100,
Expand Down
4 changes: 4 additions & 0 deletions pkg/processor/cascadingfilterprocessor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ type TraceRejectCfg struct {

// Config holds the configuration for cascading-filter-based sampling.
type Config struct {
// CollectorInstances is the number of collectors sharing single configuration for
// cascadingfilter processor. This number is used to calculate global and policy limits
// for spans_per_second. Default value is 1.
CollectorInstances uint `mapstructure:"collector_instances"`
// DecisionWait is the desired wait time from the arrival of the first span of
// trace until the decision about sampling it or not is evaluated.
DecisionWait time.Duration `mapstructure:"decision_wait"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/processor/cascadingfilterprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestLoadConfig(t *testing.T) {
id1 := component.NewIDWithName("cascading_filter", "1")
assert.Equal(t, cfg.Processors[id1],
&cfconfig.Config{
CollectorInstances: 1,
DecisionWait: 30 * time.Second,
SpansPerSecond: 0,
NumTraces: 100000,
Expand Down Expand Up @@ -101,6 +102,7 @@ func TestLoadConfig(t *testing.T) {
priorHistorySize2 := uint64(100)
assert.Equal(t, cfg.Processors[id2],
&cfconfig.Config{
CollectorInstances: 1,
DecisionWait: 10 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 10,
Expand Down
7 changes: 4 additions & 3 deletions pkg/processor/cascadingfilterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func NewFactory() processor.Factory {

func createDefaultConfig() component.Config {
return &cfconfig.Config{
DecisionWait: 30 * time.Second,
NumTraces: 100000,
SpansPerSecond: 0,
CollectorInstances: 1,
DecisionWait: 30 * time.Second,
NumTraces: 100000,
SpansPerSecond: 0,
}
}

Expand Down
41 changes: 35 additions & 6 deletions pkg/processor/cascadingfilterprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cascadingfilterprocessor

import (
"context"
"math"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -103,6 +104,8 @@ const (
AttributeSamplingLateArrival = "sampling.late_arrival"

AttributeSamplingProbability = "sampling.probability"

defaultCollectorInstancesNo = 1
)

// newTraceProcessor returns a processor.TraceProcessor that will perform Cascading Filter according to the given
Expand All @@ -126,6 +129,12 @@ func newCascadingFilterSpanProcessor(logger *zap.Logger, nextConsumer consumer.T
var policies []*TraceAcceptEvaluator
var dropTraceEvals []*TraceRejectEvaluator

// In case of lack of collectorInstances set default.
if cfg.CollectorInstances == 0 {
cfg.CollectorInstances = defaultCollectorInstancesNo
logger.Info("Using default collector instances", zap.Uint("value", defaultCollectorInstancesNo))
}

// Prepare Trace Reject config

for _, dropCfg := range cfg.TraceRejectCfgs {
Expand Down Expand Up @@ -166,6 +175,13 @@ func newCascadingFilterSpanProcessor(logger *zap.Logger, nextConsumer consumer.T
if err != nil {
return nil, err
}

if policyCfg.SpansPerSecond > 0 {
policyCalculatedSpansPerSecond := calculateSpansPerSecond(policyCfg.SpansPerSecond, cfg.CollectorInstances)
policyCfg.SpansPerSecond = policyCalculatedSpansPerSecond
totalRate += policyCfg.SpansPerSecond
}

eval, err := buildPolicyEvaluator(logger, &policyCfg)
if err != nil {
return nil, err
Expand All @@ -176,17 +192,20 @@ func newCascadingFilterSpanProcessor(logger *zap.Logger, nextConsumer consumer.T
ctx: policyCtx,
probabilisticFilter: false,
}
if policyCfg.SpansPerSecond > 0 {
totalRate += policyCfg.SpansPerSecond
}

logger.Info("Adding trace accept rule",
zap.String("name", policyCfg.Name),
zap.Int32("spans_per_second", policyCfg.SpansPerSecond))
zap.Int32("spans_per_second", policyCfg.SpansPerSecond),
zap.Uint("collector_instances", cfg.CollectorInstances),
)

policies = append(policies, policy)
}

// Recalculate the total spans per second rate if needed
spansPerSecond := cfg.SpansPerSecond
calculatedGlobalSpansPerSecond := calculateSpansPerSecond(cfg.SpansPerSecond, cfg.CollectorInstances)
cfg.SpansPerSecond = calculatedGlobalSpansPerSecond
spansPerSecond := calculatedGlobalSpansPerSecond
if spansPerSecond == 0 {
spansPerSecond = totalRate
if cfg.ProbabilisticFilteringRate != nil && *cfg.ProbabilisticFilteringRate > 0 {
Expand All @@ -195,7 +214,10 @@ func newCascadingFilterSpanProcessor(logger *zap.Logger, nextConsumer consumer.T
}

if spansPerSecond != 0 {
logger.Info("Setting total spans per second limit", zap.Int32("spans_per_second", spansPerSecond))
logger.Info("Setting total spans per second limit, based on configured collector instances",
zap.Int32("spans_per_second", spansPerSecond),
zap.Uint("collector_instances", cfg.CollectorInstances),
)
} else {
logger.Info("Not setting total spans per second limit (only selected traces will be filtered out)")
}
Expand Down Expand Up @@ -511,3 +533,10 @@ func (pt *policyTicker) Stop() {
}

var _ tTicker = (*policyTicker)(nil)

func calculateSpansPerSecond(spansPerSecond int32, collectorInstances uint) int32 {
calculateSpansPerSecond := float64(spansPerSecond) / float64(collectorInstances)
roundedSpansPerSecond := int32(math.Ceil(calculateSpansPerSecond))

return roundedSpansPerSecond
}
96 changes: 80 additions & 16 deletions pkg/processor/cascadingfilterprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,91 @@ const (
)

//nolint:unused
var testPolicy = []cfconfig.TraceAcceptCfg{{
Name: "test-policy",
SpansPerSecond: 1000,
}}

func buildBasicCFSP(t *testing.T, numTraces uint64) *cascadingFilterSpanProcessor {
cfg := cfconfig.Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: numTraces,
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
var testPolicy = []cfconfig.TraceAcceptCfg{
{
Name: "test-policy",
SpansPerSecond: 100,
},
{
Name: "test-policy2",
SpansPerSecond: 10,
},
{
Name: "test-policy3",
SpansPerSecond: -1,
}}

func buildBasicCFSP(t *testing.T, numTraces uint64, spansPerSecond int32, collectorInstances uint) *cascadingFilterSpanProcessor {
if collectorInstances != 1 {
cfg = cfconfig.Config{
CollectorInstances: collectorInstances,
DecisionWait: defaultTestDecisionWait,
NumTraces: numTraces,
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
SpansPerSecond: spansPerSecond,
}
} else {
cfg = cfconfig.Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: numTraces,
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
SpansPerSecond: spansPerSecond,
}
}

sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg, component.NewID("cascading_filter"))
require.NoError(t, err)

return sp.(*cascadingFilterSpanProcessor)
}

func TestTotalSpansPerSecondForSumoCollectorInstancesDefault(t *testing.T) {
tsp := buildBasicCFSP(t, uint64(1000), int32(1000), 1)

require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(1000))
}

func TestTotalSpansPerSecondForSumoCollectorInstances0(t *testing.T) {
tsp := buildBasicCFSP(t, uint64(1000), int32(1000), 0)

require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(1000))
}

func TestTotalSpansPerSecondForSumoCollectorInstances10(t *testing.T) {
tsp := buildBasicCFSP(t, uint64(1000), int32(1000), 10)

require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(100))
}

func TestTotalSpansPerSecondForSumoCollectorInstancesRoundedUp(t *testing.T) {
tsp := buildBasicCFSP(t, uint64(1000), int32(1000), 325)

require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(4))
}

func TestTotalSpansPerSecondForSumoCollectorInstances0AndNoGlobalSpansPerSecondLimit(t *testing.T) {
tsp := buildBasicCFSP(t, uint64(1000), int32(0), 0)

require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(110))
}

func TestTotalSpansPerSecondForSumoCollectorInstances10AndNoGlobalSpansPerSecondLimit(t *testing.T) {
tsp := buildBasicCFSP(t, uint64(1000), int32(0), 10)

require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(11))
}

func TestTotalSpansPerSecondForSumoCollectorInstances0AndGlobalSpansPerSecondLimitLowerThanPolicies(t *testing.T) {
tsp := buildBasicCFSP(t, uint64(1000), int32(60), 0)

require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(60))
}

func TestSequentialTraceArrival(t *testing.T) {
traceIds, batches := generateIdsAndBatches(128)
tsp := buildBasicCFSP(t, uint64(2*len(traceIds)))
tsp := buildBasicCFSP(t, uint64(2*len(traceIds)), int32(1000), 1)
for _, batch := range batches {
assert.NoError(t, tsp.ConsumeTraces(context.Background(), batch))
}
Expand All @@ -78,7 +142,7 @@ func TestSequentialTraceArrival(t *testing.T) {
}

func TestDecisionHistory(t *testing.T) {
tsp := buildBasicCFSP(t, uint64(100))
tsp := buildBasicCFSP(t, uint64(100), int32(1000), 1)

id1 := bigendianconverter.UInt64ToTraceID(1, uint64(100))
id2 := bigendianconverter.UInt64ToTraceID(1, uint64(101))
Expand All @@ -104,7 +168,7 @@ func TestDecisionHistory(t *testing.T) {

func TestConcurrentTraceArrival(t *testing.T) {
traceIds, batches := generateIdsAndBatches(64)
tsp := buildBasicCFSP(t, uint64(2*len(traceIds)))
tsp := buildBasicCFSP(t, uint64(2*len(traceIds)), int32(1000), 1)

var wg sync.WaitGroup
for _, batch := range batches {
Expand Down Expand Up @@ -137,7 +201,7 @@ func TestConcurrentTraceArrival(t *testing.T) {
func TestSequentialTraceMapSize(t *testing.T) {
traceIds, batches := generateIdsAndBatches(210)
const maxSize = 100
tsp := buildBasicCFSP(t, uint64(maxSize))
tsp := buildBasicCFSP(t, uint64(maxSize), int32(1000), 1)

for _, batch := range batches {
if err := tsp.ConsumeTraces(context.Background(), batch); err != nil {
Expand All @@ -156,7 +220,7 @@ func TestConcurrentTraceMapSize(t *testing.T) {
_, batches := generateIdsAndBatches(64)
const maxSize = 50
var wg sync.WaitGroup
tsp := buildBasicCFSP(t, uint64(maxSize))
tsp := buildBasicCFSP(t, uint64(maxSize), int32(1000), 1)
for _, batch := range batches {
wg.Add(1)
go func(td ptrace.Traces) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ exporters:

processors:
cascading_filter/1:
collector_instances: 1
probabilistic_filtering_rate: 100
trace_reject_filters:
- name: healthcheck-rule
Expand All @@ -30,6 +31,7 @@ processors:
values:
- abc
cascading_filter/2:
collector_instances: 1
decision_wait: 10s
num_traces: 100
expected_new_traces_per_sec: 10
Expand Down

0 comments on commit 3223aa8

Please sign in to comment.