Skip to content

Commit

Permalink
feat: flag to enable gauge replacement
Browse files Browse the repository at this point in the history
Signed-off-by: Fred Cox <[email protected]>
  • Loading branch information
mcfedr committed Jan 4, 2024
1 parent 27b0fd3 commit 9626541
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 19 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Flags:
Example: "user1=pass1,user2=pass2"
--apiListen string Listen for API requests on this host/port. (default ":80")
--cors string The 'Access-Control-Allow-Origin' value to be returned. (default "*")
--gaugeBehavior string How gauges are aggregated (sum or replace). (default "sum")
-h, --help help for prom-aggregation-gateway
--lifecycleListen string Listen for lifecycle requests (health, metrics) on this host/port (default ":8888")

Expand All @@ -73,6 +74,15 @@ Use "prom-aggregation-gateway [command] --help" for more information about a com

Any flags you see above can also be set by `ENV_VARIABLES`. ENV_VARS must have a prefix of `PAG_`, for example `PAG_AUTHUSERS=user1=pass1,user2=pass2` will start the service with basic auth. If an ENV_VARIABLE is set than it will be used over a CLI argument passed to the service.

### Gauge Behavior

Gauges can be aggregated in two ways:

1. `sum` (default) - just like counters, by summing the values received.
2. `replace` - every new value replaces the current value, scrape will always see the latest value.

set this with the flag `--gaugeBehavior=XXX` or as an env var `PAG_GAUGEBEHAVIOR=XXX`.

## Ready-built images

Container images are published here:
Expand Down
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func Execute() {
rootCmd.PersistentFlags().StringVar(&cfg.ApiListen, "apiListen", ":80", "Listen for API requests on this host/port.")
rootCmd.PersistentFlags().StringVar(&cfg.LifecycleListen, "lifecycleListen", ":8888", "Listen for lifecycle requests (health, metrics) on this host/port")
rootCmd.PersistentFlags().StringVar(&cfg.CorsDomain, "cors", "*", "The 'Access-Control-Allow-Origin' value to be returned.")
rootCmd.PersistentFlags().StringVar(&cfg.GaugeBehavior, "gaugeBehavior", "sum", "How gauges are aggregated (sum or replace).")

if err := rootCmd.Execute(); err != nil {
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func startFunc(cmd *cobra.Command, args []string) error {
Accounts: cfg.AuthUsers,
}

routers.RunServers(apiCfg, cfg.ApiListen, cfg.LifecycleListen)
routers.RunServers(apiCfg, cfg.ApiListen, cfg.LifecycleListen, cfg.GaugeBehavior)

return nil
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Server struct {
LifecycleListen string
CorsDomain string
AuthUsers []string
GaugeBehavior string
}

const (
Expand Down
23 changes: 21 additions & 2 deletions metrics/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

type metricFamily struct {
*dto.MetricFamily
lock sync.RWMutex
lock sync.RWMutex
options *aggregateOptions
}

type Aggregate struct {
Expand All @@ -28,9 +29,17 @@ type Aggregate struct {

type ignoredLabels []string

type gaugeBehavior string

const (
sumBehavior gaugeBehavior = "sum"
replaceBehavior = "replace"
)

type aggregateOptions struct {
ignoredLabels ignoredLabels
metricTTLDuration *time.Duration
gaugeBehavior gaugeBehavior
}

type aggregateOptionsFunc func(a *Aggregate)
Expand All @@ -47,6 +56,16 @@ func SetTTLMetricTime(duration *time.Duration) aggregateOptionsFunc {
}
}

func SetGaugeBehavior(behavior string) aggregateOptionsFunc {
return func(a *Aggregate) {
if behavior == replaceBehavior {
a.options.gaugeBehavior = replaceBehavior
} else {
a.options.gaugeBehavior = sumBehavior
}
}
}

func NewAggregate(opts ...aggregateOptionsFunc) *Aggregate {
a := &Aggregate{
families: map[string]*metricFamily{},
Expand Down Expand Up @@ -91,7 +110,7 @@ func (a *Aggregate) setFamilyOrGetExistingFamily(familyName string, family *dto.
defer a.familiesLock.Unlock()
existingFamily, ok := a.families[familyName]
if !ok {
a.families[familyName] = &metricFamily{MetricFamily: family}
a.families[familyName] = &metricFamily{MetricFamily: family, options: &a.options}
return nil
}
return existingFamily
Expand Down
48 changes: 39 additions & 9 deletions metrics/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,28 @@ histogram_bucket{job="test",le="+Inf"} 9
histogram_sum{job="test"} 7
histogram_count{job="test"} 2
`

wantReplace = `# HELP counter A counter
# TYPE counter counter
counter{job="test"} 60
# HELP gauge A gauge
# TYPE gauge gauge
gauge{job="test"} 57
# HELP histogram A histogram
# TYPE histogram histogram
histogram_bucket{job="test",le="1"} 0
histogram_bucket{job="test",le="2"} 0
histogram_bucket{job="test",le="3"} 3
histogram_bucket{job="test",le="4"} 8
histogram_bucket{job="test",le="5"} 9
histogram_bucket{job="test",le="6"} 9
histogram_bucket{job="test",le="7"} 9
histogram_bucket{job="test",le="8"} 9
histogram_bucket{job="test",le="9"} 9
histogram_bucket{job="test",le="10"} 9
histogram_bucket{job="test",le="+Inf"} 9
histogram_sum{job="test"} 7
histogram_count{job="test"} 2
`
multilabel1 = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b", ignore_label="ignore_value"} 1
Expand Down Expand Up @@ -121,6 +142,12 @@ ui_external_lib_loaded{name="mixpanel",loaded="true"} 1
ui_external_lib_loaded{job="test",loaded="true",name="Intercom"} 2
ui_external_lib_loaded{job="test",loaded="true",name="ga"} 2
ui_external_lib_loaded{job="test",loaded="true",name="mixpanel"} 2
`
gaugeOutputReplace = `# HELP ui_external_lib_loaded A gauge with entries in un-sorted order
# TYPE ui_external_lib_loaded gauge
ui_external_lib_loaded{job="test",loaded="true",name="Intercom"} 1
ui_external_lib_loaded{job="test",loaded="true",name="ga"} 1
ui_external_lib_loaded{job="test",loaded="true",name="mixpanel"} 1
`
duplicateLabels = `
# HELP ui_external_lib_loaded Test with duplicate values
Expand Down Expand Up @@ -179,17 +206,20 @@ func TestAggregate(t *testing.T) {
a, b string
want string
ignoredLabels []string
behavior string
}{
{"simpleGauge", gaugeInput, gaugeInput, gaugeOutput, []string{}},
{"in", in1, in2, want, []string{}},
{"multilabel", multilabel1, multilabel2, multilabelResult, []string{"ignore_label"}},
{"labelFields", labelFields1, labelFields2, labelFieldResult, []string{}},
{"reorderedLabels", reorderedLabels1, reorderedLabels2, reorderedLabelsResult, []string{}},
{"ignoredLabels", ignoredLabels1, ignoredLabels2, ignoredLabelsResult, []string{"ignore_me"}},
{"summary", summaryInput, summaryInput, summaryOutput, []string{}},
{"simpleGauge", gaugeInput, gaugeInput, gaugeOutput, []string{}, "sum"},
{"simpleGaugeReplace", gaugeInput, gaugeInput, gaugeOutputReplace, []string{}, "replace"},
{"in", in1, in2, want, []string{}, "sum"},
{"inReplace", in1, in2, wantReplace, []string{}, "replace"},
{"multilabel", multilabel1, multilabel2, multilabelResult, []string{"ignore_label"}, "sum"},
{"labelFields", labelFields1, labelFields2, labelFieldResult, []string{}, "sum"},
{"reorderedLabels", reorderedLabels1, reorderedLabels2, reorderedLabelsResult, []string{}, "sum"},
{"ignoredLabels", ignoredLabels1, ignoredLabels2, ignoredLabelsResult, []string{"ignore_me"}, "sum"},
{"summary", summaryInput, summaryInput, summaryOutput, []string{}, "sum"},
} {
t.Run(c.testName, func(t *testing.T) {
agg := NewAggregate(AddIgnoredLabels(c.ignoredLabels...))
agg := NewAggregate(AddIgnoredLabels(c.ignoredLabels...), SetGaugeBehavior(c.behavior))

err := agg.parseAndMerge(strings.NewReader(c.a), testLabels)
require.NoError(t, err)
Expand Down
15 changes: 10 additions & 5 deletions metrics/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func mergeBuckets(a, b []*dto.Bucket) []*dto.Bucket {
return output
}

func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
func mergeMetric(ty dto.MetricType, a, b *dto.Metric, options *aggregateOptions) *dto.Metric {
switch ty {
case dto.MetricType_COUNTER:
return &dto.Metric{
Expand All @@ -82,9 +82,14 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
}

case dto.MetricType_GAUGE:
// No very meaningful way for us to merge gauges. We'll sum them
// and clear out any gauges on scrape, as a best approximation, but
// this relies on client pushing with the same interval as we scrape.
if options.gaugeBehavior == replaceBehavior {
return &dto.Metric{
Label: a.Label,
Gauge: &dto.Gauge{
Value: float64ptr(*b.Gauge.Value),
},
}
}
return &dto.Metric{
Label: a.Label,
Gauge: &dto.Gauge{
Expand Down Expand Up @@ -143,7 +148,7 @@ func (mf *metricFamily) mergeFamily(b *dto.MetricFamily) error {
newMetric = append(newMetric, b.Metric[j])
j++
} else {
merged := mergeMetric(*mf.Type, mf.Metric[i], b.Metric[j])
merged := mergeMetric(*mf.Type, mf.Metric[i], b.Metric[j], mf.options)
if merged != nil {
newMetric = append(newMetric, merged)
}
Expand Down
4 changes: 2 additions & 2 deletions routers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"github.com/zapier/prom-aggregation-gateway/metrics"
)

func RunServers(cfg ApiRouterConfig, apiListen string, lifecycleListen string) {
func RunServers(cfg ApiRouterConfig, apiListen string, lifecycleListen string, gaugeBehavior string) {
sigChannel := make(chan os.Signal, 1)
signal.Notify(sigChannel, syscall.SIGTERM, syscall.SIGINT)

agg := metrics.NewAggregate()
agg := metrics.NewAggregate(metrics.SetGaugeBehavior(gaugeBehavior))

promMetricsConfig := promMetrics.Config{
Registry: metrics.PromRegistry,
Expand Down

0 comments on commit 9626541

Please sign in to comment.