diff --git a/.changelog/1358.added.txt b/.changelog/1358.added.txt new file mode 100644 index 0000000000..65c0fe7283 --- /dev/null +++ b/.changelog/1358.added.txt @@ -0,0 +1 @@ +feat(cascadingfilter): add collector_instances config option for spans_per_second global and policy limits scaling \ No newline at end of file diff --git a/pkg/processor/cascadingfilterprocessor/README.md b/pkg/processor/cascadingfilterprocessor/README.md index 4e87d94d78..b908d3054f 100644 --- a/pkg/processor/cascadingfilterprocessor/README.md +++ b/pkg/processor/cascadingfilterprocessor/README.md @@ -20,6 +20,7 @@ The following configuration options should be configured as desired: The following configuration options can also be modified: +- `collector_instances` (default = 1): In case of multiple deployments **sharing single configuration** of the `cascadingfilter`, should be used to scale down properly `spans_per_second` global and policy limits. Value should be positive integer corresponding to the number of collectors with configured cascadingfilters e.g. `collector_instances=5`. As a result configured `spans_per_second` limit will be divided by `5` for global and policy limits. - `decision_wait` (default = 30s): Wait time since the first span of a trace before making a filtering decision - `num_traces` (default = 100000): Max number of traces for which decisions are kept in memory - `history_size` (default = `num_traces` value): Max size of LRU cache used for storing decisions on already processed traces @@ -28,6 +29,9 @@ The following configuration options can also be modified: Whenever rate limiting is applied, only full traces are accepted (if trace won't fit within the limit, it will never be filtered). For spans that are arriving late, previous decision are kept for some time. +In case of multiple deployments **sharing single conifugration file** of the `cascadingfilter`, environment variable called `SUMO_COLLECTOR_INSTANCES` should be used to scale down properly `spans_per_second` global and policy limits. `SUMO_COLLECTOR_INSTANCES` should be positive integer corresponding to the number of collectors with configured cascadingfilters e.g. `SUMO_COLLECTOR_INSTANCES=5`. +As a result configured `spans_per_second` limit will be divided by `5` for global and policy limits. + ## Updated span attributes The processor modifies each span attributes, by setting following two attributes: diff --git a/pkg/processor/cascadingfilterprocessor/cascade_test.go b/pkg/processor/cascadingfilterprocessor/cascade_test.go index bff10e2ef6..a49bf08cc9 100644 --- a/pkg/processor/cascadingfilterprocessor/cascade_test.go +++ b/pkg/processor/cascadingfilterprocessor/cascade_test.go @@ -34,6 +34,7 @@ var testValue = 10 * time.Millisecond var probabilisticFilteringRate = int32(10) var healthCheckPattern = "health" var cfg = cfconfig.Config{ + CollectorInstances: 1, DecisionWait: 2 * time.Second, NumTraces: 100, ExpectedNewTracesPerSec: 100, @@ -60,8 +61,9 @@ var cfg = cfconfig.Config{ } var cfgJustDropping = cfconfig.Config{ - DecisionWait: 2 * time.Second, - NumTraces: 100, + CollectorInstances: 1, + DecisionWait: 2 * time.Second, + NumTraces: 100, TraceRejectCfgs: []cfconfig.TraceRejectCfg{ { Name: "health-check", @@ -71,6 +73,7 @@ var cfgJustDropping = cfconfig.Config{ } var cfgAutoRate = cfconfig.Config{ + CollectorInstances: 1, DecisionWait: 2 * time.Second, ProbabilisticFilteringRate: &probabilisticFilteringRate, NumTraces: 100, diff --git a/pkg/processor/cascadingfilterprocessor/config/config.go b/pkg/processor/cascadingfilterprocessor/config/config.go index 29f5fcf67e..c512204abe 100644 --- a/pkg/processor/cascadingfilterprocessor/config/config.go +++ b/pkg/processor/cascadingfilterprocessor/config/config.go @@ -105,6 +105,10 @@ type TraceRejectCfg struct { // Config holds the configuration for cascading-filter-based sampling. type Config struct { + // CollectorInstances is the number of collectors sharing single configuration for + // cascadingfilter processor. This number is used to calculate global and policy limits + // for spans_per_second. Default value is 1. + CollectorInstances uint `mapstructure:"collector_instances"` // DecisionWait is the desired wait time from the arrival of the first span of // trace until the decision about sampling it or not is evaluated. DecisionWait time.Duration `mapstructure:"decision_wait"` diff --git a/pkg/processor/cascadingfilterprocessor/config_test.go b/pkg/processor/cascadingfilterprocessor/config_test.go index 2b96776bc0..f87f5ac421 100644 --- a/pkg/processor/cascadingfilterprocessor/config_test.go +++ b/pkg/processor/cascadingfilterprocessor/config_test.go @@ -49,6 +49,7 @@ func TestLoadConfig(t *testing.T) { id1 := component.NewIDWithName("cascading_filter", "1") assert.Equal(t, cfg.Processors[id1], &cfconfig.Config{ + CollectorInstances: 1, DecisionWait: 30 * time.Second, SpansPerSecond: 0, NumTraces: 100000, @@ -101,6 +102,7 @@ func TestLoadConfig(t *testing.T) { priorHistorySize2 := uint64(100) assert.Equal(t, cfg.Processors[id2], &cfconfig.Config{ + CollectorInstances: 1, DecisionWait: 10 * time.Second, NumTraces: 100, ExpectedNewTracesPerSec: 10, diff --git a/pkg/processor/cascadingfilterprocessor/factory.go b/pkg/processor/cascadingfilterprocessor/factory.go index 98f7eb515d..b70f85dad5 100644 --- a/pkg/processor/cascadingfilterprocessor/factory.go +++ b/pkg/processor/cascadingfilterprocessor/factory.go @@ -51,9 +51,10 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &cfconfig.Config{ - DecisionWait: 30 * time.Second, - NumTraces: 100000, - SpansPerSecond: 0, + CollectorInstances: 1, + DecisionWait: 30 * time.Second, + NumTraces: 100000, + SpansPerSecond: 0, } } diff --git a/pkg/processor/cascadingfilterprocessor/processor.go b/pkg/processor/cascadingfilterprocessor/processor.go index 0ed22b932a..c943a9e5da 100644 --- a/pkg/processor/cascadingfilterprocessor/processor.go +++ b/pkg/processor/cascadingfilterprocessor/processor.go @@ -16,6 +16,7 @@ package cascadingfilterprocessor import ( "context" + "math" "runtime" "sync" "sync/atomic" @@ -103,6 +104,8 @@ const ( AttributeSamplingLateArrival = "sampling.late_arrival" AttributeSamplingProbability = "sampling.probability" + + defaultCollectorInstancesNo = 1 ) // newTraceProcessor returns a processor.TraceProcessor that will perform Cascading Filter according to the given @@ -126,6 +129,12 @@ func newCascadingFilterSpanProcessor(logger *zap.Logger, nextConsumer consumer.T var policies []*TraceAcceptEvaluator var dropTraceEvals []*TraceRejectEvaluator + // In case of lack of collectorInstances set default. + if cfg.CollectorInstances == 0 { + cfg.CollectorInstances = defaultCollectorInstancesNo + logger.Info("Using default collector instances", zap.Uint("value", defaultCollectorInstancesNo)) + } + // Prepare Trace Reject config for _, dropCfg := range cfg.TraceRejectCfgs { @@ -166,6 +175,13 @@ func newCascadingFilterSpanProcessor(logger *zap.Logger, nextConsumer consumer.T if err != nil { return nil, err } + + if policyCfg.SpansPerSecond > 0 { + policyCalculatedSpansPerSecond := calculateSpansPerSecond(policyCfg.SpansPerSecond, cfg.CollectorInstances) + policyCfg.SpansPerSecond = policyCalculatedSpansPerSecond + totalRate += policyCfg.SpansPerSecond + } + eval, err := buildPolicyEvaluator(logger, &policyCfg) if err != nil { return nil, err @@ -176,17 +192,20 @@ func newCascadingFilterSpanProcessor(logger *zap.Logger, nextConsumer consumer.T ctx: policyCtx, probabilisticFilter: false, } - if policyCfg.SpansPerSecond > 0 { - totalRate += policyCfg.SpansPerSecond - } + logger.Info("Adding trace accept rule", zap.String("name", policyCfg.Name), - zap.Int32("spans_per_second", policyCfg.SpansPerSecond)) + zap.Int32("spans_per_second", policyCfg.SpansPerSecond), + zap.Uint("collector_instances", cfg.CollectorInstances), + ) + policies = append(policies, policy) } // Recalculate the total spans per second rate if needed - spansPerSecond := cfg.SpansPerSecond + calculatedGlobalSpansPerSecond := calculateSpansPerSecond(cfg.SpansPerSecond, cfg.CollectorInstances) + cfg.SpansPerSecond = calculatedGlobalSpansPerSecond + spansPerSecond := calculatedGlobalSpansPerSecond if spansPerSecond == 0 { spansPerSecond = totalRate if cfg.ProbabilisticFilteringRate != nil && *cfg.ProbabilisticFilteringRate > 0 { @@ -195,7 +214,10 @@ func newCascadingFilterSpanProcessor(logger *zap.Logger, nextConsumer consumer.T } if spansPerSecond != 0 { - logger.Info("Setting total spans per second limit", zap.Int32("spans_per_second", spansPerSecond)) + logger.Info("Setting total spans per second limit, based on configured collector instances", + zap.Int32("spans_per_second", spansPerSecond), + zap.Uint("collector_instances", cfg.CollectorInstances), + ) } else { logger.Info("Not setting total spans per second limit (only selected traces will be filtered out)") } @@ -511,3 +533,10 @@ func (pt *policyTicker) Stop() { } var _ tTicker = (*policyTicker)(nil) + +func calculateSpansPerSecond(spansPerSecond int32, collectorInstances uint) int32 { + calculateSpansPerSecond := float64(spansPerSecond) / float64(collectorInstances) + roundedSpansPerSecond := int32(math.Ceil(calculateSpansPerSecond)) + + return roundedSpansPerSecond +} diff --git a/pkg/processor/cascadingfilterprocessor/processor_test.go b/pkg/processor/cascadingfilterprocessor/processor_test.go index 22f731f353..5ab3cb3e03 100644 --- a/pkg/processor/cascadingfilterprocessor/processor_test.go +++ b/pkg/processor/cascadingfilterprocessor/processor_test.go @@ -44,27 +44,91 @@ const ( ) //nolint:unused -var testPolicy = []cfconfig.TraceAcceptCfg{{ - Name: "test-policy", - SpansPerSecond: 1000, -}} - -func buildBasicCFSP(t *testing.T, numTraces uint64) *cascadingFilterSpanProcessor { - cfg := cfconfig.Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: numTraces, - ExpectedNewTracesPerSec: 64, - PolicyCfgs: testPolicy, +var testPolicy = []cfconfig.TraceAcceptCfg{ + { + Name: "test-policy", + SpansPerSecond: 100, + }, + { + Name: "test-policy2", + SpansPerSecond: 10, + }, + { + Name: "test-policy3", + SpansPerSecond: -1, + }} + +func buildBasicCFSP(t *testing.T, numTraces uint64, spansPerSecond int32, collectorInstances uint) *cascadingFilterSpanProcessor { + if collectorInstances != 1 { + cfg = cfconfig.Config{ + CollectorInstances: collectorInstances, + DecisionWait: defaultTestDecisionWait, + NumTraces: numTraces, + ExpectedNewTracesPerSec: 64, + PolicyCfgs: testPolicy, + SpansPerSecond: spansPerSecond, + } + } else { + cfg = cfconfig.Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: numTraces, + ExpectedNewTracesPerSec: 64, + PolicyCfgs: testPolicy, + SpansPerSecond: spansPerSecond, + } } + sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg, component.NewID("cascading_filter")) require.NoError(t, err) + return sp.(*cascadingFilterSpanProcessor) +} + +func TestTotalSpansPerSecondForSumoCollectorInstancesDefault(t *testing.T) { + tsp := buildBasicCFSP(t, uint64(1000), int32(1000), 1) + + require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(1000)) +} + +func TestTotalSpansPerSecondForSumoCollectorInstances0(t *testing.T) { + tsp := buildBasicCFSP(t, uint64(1000), int32(1000), 0) + + require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(1000)) +} + +func TestTotalSpansPerSecondForSumoCollectorInstances10(t *testing.T) { + tsp := buildBasicCFSP(t, uint64(1000), int32(1000), 10) + + require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(100)) +} + +func TestTotalSpansPerSecondForSumoCollectorInstancesRoundedUp(t *testing.T) { + tsp := buildBasicCFSP(t, uint64(1000), int32(1000), 325) + + require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(4)) +} + +func TestTotalSpansPerSecondForSumoCollectorInstances0AndNoGlobalSpansPerSecondLimit(t *testing.T) { + tsp := buildBasicCFSP(t, uint64(1000), int32(0), 0) + + require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(110)) +} + +func TestTotalSpansPerSecondForSumoCollectorInstances10AndNoGlobalSpansPerSecondLimit(t *testing.T) { + tsp := buildBasicCFSP(t, uint64(1000), int32(0), 10) + + require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(11)) +} + +func TestTotalSpansPerSecondForSumoCollectorInstances0AndGlobalSpansPerSecondLimitLowerThanPolicies(t *testing.T) { + tsp := buildBasicCFSP(t, uint64(1000), int32(60), 0) + require.Equal(t, tsp.decisionSpansLimitter.maxSpansPerSecond, int32(60)) } func TestSequentialTraceArrival(t *testing.T) { traceIds, batches := generateIdsAndBatches(128) - tsp := buildBasicCFSP(t, uint64(2*len(traceIds))) + tsp := buildBasicCFSP(t, uint64(2*len(traceIds)), int32(1000), 1) for _, batch := range batches { assert.NoError(t, tsp.ConsumeTraces(context.Background(), batch)) } @@ -78,7 +142,7 @@ func TestSequentialTraceArrival(t *testing.T) { } func TestDecisionHistory(t *testing.T) { - tsp := buildBasicCFSP(t, uint64(100)) + tsp := buildBasicCFSP(t, uint64(100), int32(1000), 1) id1 := bigendianconverter.UInt64ToTraceID(1, uint64(100)) id2 := bigendianconverter.UInt64ToTraceID(1, uint64(101)) @@ -104,7 +168,7 @@ func TestDecisionHistory(t *testing.T) { func TestConcurrentTraceArrival(t *testing.T) { traceIds, batches := generateIdsAndBatches(64) - tsp := buildBasicCFSP(t, uint64(2*len(traceIds))) + tsp := buildBasicCFSP(t, uint64(2*len(traceIds)), int32(1000), 1) var wg sync.WaitGroup for _, batch := range batches { @@ -137,7 +201,7 @@ func TestConcurrentTraceArrival(t *testing.T) { func TestSequentialTraceMapSize(t *testing.T) { traceIds, batches := generateIdsAndBatches(210) const maxSize = 100 - tsp := buildBasicCFSP(t, uint64(maxSize)) + tsp := buildBasicCFSP(t, uint64(maxSize), int32(1000), 1) for _, batch := range batches { if err := tsp.ConsumeTraces(context.Background(), batch); err != nil { @@ -156,7 +220,7 @@ func TestConcurrentTraceMapSize(t *testing.T) { _, batches := generateIdsAndBatches(64) const maxSize = 50 var wg sync.WaitGroup - tsp := buildBasicCFSP(t, uint64(maxSize)) + tsp := buildBasicCFSP(t, uint64(maxSize), int32(1000), 1) for _, batch := range batches { wg.Add(1) go func(td ptrace.Traces) { diff --git a/pkg/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml b/pkg/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml index 81015cec68..ac3f638776 100644 --- a/pkg/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml +++ b/pkg/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml @@ -6,6 +6,7 @@ exporters: processors: cascading_filter/1: + collector_instances: 1 probabilistic_filtering_rate: 100 trace_reject_filters: - name: healthcheck-rule @@ -30,6 +31,7 @@ processors: values: - abc cascading_filter/2: + collector_instances: 1 decision_wait: 10s num_traces: 100 expected_new_traces_per_sec: 10