Skip to content

Commit

Permalink
Reduce allocations/cpu from scraping
Browse files Browse the repository at this point in the history
This is a followup to the prior streaming collection PR where we
reduce the CPU and allocations incurred during scraping.

benchmark                           old ns/op     new ns/op     delta
BenchmarkIterator_TimeSeries-10     365           119           -67.39%

benchmark                           old allocs     new allocs     delta
BenchmarkIterator_TimeSeries-10     9              0              -100.00%

benchmark                           old bytes     new bytes     delta
BenchmarkIterator_TimeSeries-10     480           0             -100.00%
  • Loading branch information
jwilder committed Sep 20, 2024
1 parent f13484e commit 64d1ed5
Show file tree
Hide file tree
Showing 28 changed files with 1,019 additions and 345 deletions.
42 changes: 21 additions & 21 deletions collector/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (t *OltpMetricWriter) Write(ctx context.Context, msg *v1.ExportMetricsServi
return nil
}

func (t *OltpMetricWriter) addSeriesAndFlushIfNecessary(ctx context.Context, wr *prompb.WriteRequest, series prompb.TimeSeries) error {
func (t *OltpMetricWriter) addSeriesAndFlushIfNecessary(ctx context.Context, wr *prompb.WriteRequest, series *prompb.TimeSeries) error {
series = t.requestTransformer.TransformTimeSeries(series)
wr.Timeseries = append(wr.Timeseries, series)
if len(wr.Timeseries) >= t.maxBatchSize {
Expand Down Expand Up @@ -226,7 +226,7 @@ func (t *OltpMetricWriter) sendBatch(ctx context.Context, wr *prompb.WriteReques
func (t *OltpMetricWriter) addOltpNumberPoints(ctx context.Context, name string, points []*metricsv1.NumberDataPoint, wr *prompb.WriteRequest) error {
for _, point := range points {
series := newSeries(name, point.Attributes)
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: unixNanoToUnixMillis(point.TimeUnixNano),
Value: asFloat64(point),
Expand All @@ -245,7 +245,7 @@ func (t *OltpMetricWriter) addOltpHistogramPoints(ctx context.Context, name stri

// Add count series
series := newSeries(fmt.Sprintf("%s_count", name), point.Attributes)
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: timestamp,
Value: float64(point.Count),
Expand All @@ -258,7 +258,7 @@ func (t *OltpMetricWriter) addOltpHistogramPoints(ctx context.Context, name stri
// Add sum series, if present
if point.Sum != nil {
series := newSeries(fmt.Sprintf("%s_sum", name), point.Attributes)
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: timestamp,
Value: float64(*point.Sum),
Expand All @@ -282,11 +282,11 @@ func (t *OltpMetricWriter) addOltpHistogramPoints(ctx context.Context, name stri
}

series := newSeries(fmt.Sprintf("%s_bucket", name), point.Attributes)
series.Labels = append(series.Labels, prompb.Label{
series.Labels = append(series.Labels, &prompb.Label{
Name: []byte("le"),
Value: []byte(upperBound),
})
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: timestamp,
Value: float64(point.BucketCounts[i]),
Expand All @@ -312,7 +312,7 @@ func (t *OltpMetricWriter) addOltpExpHistogramPoints(ctx context.Context, name s

// Add count series
series := newSeries(fmt.Sprintf("%s_count", name), point.Attributes)
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: timestamp,
Value: float64(point.Count),
Expand All @@ -325,7 +325,7 @@ func (t *OltpMetricWriter) addOltpExpHistogramPoints(ctx context.Context, name s
// Add sum series, if present
if point.Sum != nil {
series := newSeries(fmt.Sprintf("%s_sum", name), point.Attributes)
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: timestamp,
Value: float64(*point.Sum),
Expand All @@ -349,11 +349,11 @@ func (t *OltpMetricWriter) addOltpExpHistogramPoints(ctx context.Context, name s
upperBound := fmt.Sprintf("%f", math.Pow(-base, float64(bucketIdx)))

series := newSeries(fmt.Sprintf("%s_bucket", name), point.Attributes)
series.Labels = append(series.Labels, prompb.Label{
series.Labels = append(series.Labels, &prompb.Label{
Name: []byte("le"),
Value: []byte(upperBound),
})
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: timestamp,
Value: float64(buckets.BucketCounts[i]),
Expand All @@ -376,11 +376,11 @@ func (t *OltpMetricWriter) addOltpExpHistogramPoints(ctx context.Context, name s
upperBound := fmt.Sprintf("%f", math.Pow(base, float64(bucketIdx+1)))

series := newSeries(fmt.Sprintf("%s_bucket", name), point.Attributes)
series.Labels = append(series.Labels, prompb.Label{
series.Labels = append(series.Labels, &prompb.Label{
Name: []byte("le"),
Value: []byte(upperBound),
})
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: timestamp,
Value: float64(buckets.BucketCounts[i]),
Expand All @@ -401,7 +401,7 @@ func (t *OltpMetricWriter) addOltpSummaryPoints(ctx context.Context, name string

// Add count series
series := newSeries(fmt.Sprintf("%s_count", name), point.Attributes)
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: timestamp,
Value: float64(point.Count),
Expand All @@ -412,7 +412,7 @@ func (t *OltpMetricWriter) addOltpSummaryPoints(ctx context.Context, name string
}

series = newSeries(fmt.Sprintf("%s_sum", name), point.Attributes)
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: timestamp,
Value: point.Sum,
Expand All @@ -425,11 +425,11 @@ func (t *OltpMetricWriter) addOltpSummaryPoints(ctx context.Context, name string
// Add quantile series
for _, quantile := range point.QuantileValues {
series := newSeries(name, point.Attributes)
series.Labels = append(series.Labels, prompb.Label{
series.Labels = append(series.Labels, &prompb.Label{
Name: []byte("quantile"),
Value: []byte(fmt.Sprintf("%f", quantile.Quantile)),
})
series.Samples = []prompb.Sample{
series.Samples = []*prompb.Sample{
{
Timestamp: timestamp,
Value: quantile.Value,
Expand All @@ -443,18 +443,18 @@ func (t *OltpMetricWriter) addOltpSummaryPoints(ctx context.Context, name string
return nil
}

func newSeries(name string, attributes []*commonv1.KeyValue) prompb.TimeSeries {
ts := prompb.TimeSeries{
Labels: make([]prompb.Label, 0, len(attributes)+1),
func newSeries(name string, attributes []*commonv1.KeyValue) *prompb.TimeSeries {
ts := &prompb.TimeSeries{
Labels: make([]*prompb.Label, 0, len(attributes)+1),
}

ts.Labels = append(ts.Labels, prompb.Label{
ts.Labels = append(ts.Labels, &prompb.Label{
Name: []byte("__name__"),
Value: []byte(name),
})

for _, l := range attributes {
ts.Labels = append(ts.Labels, prompb.Label{
ts.Labels = append(ts.Labels, &prompb.Label{
Name: []byte(l.Key),
Value: []byte(l.Value.String()),
})
Expand Down
40 changes: 22 additions & 18 deletions collector/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ type Scraper struct {

mu sync.RWMutex
targets map[string]ScrapeTarget

wr *prompb.WriteRequest
}

func NewScraper(opts *ScraperOpts) *Scraper {
Expand Down Expand Up @@ -214,17 +216,21 @@ func (s *Scraper) scrapeTargets(ctx context.Context) {
continue
}
for iter.Next() {
ts, err := iter.TimeSeries()
pt := prompb.TimeSeriesPool.Get().(*prompb.TimeSeries)
pt.Reset()
ts, err := iter.TimeSeriesInto(pt)
if err != nil {
logger.Errorf("Failed to get value: %s", err.Error())
logger.Errorf("Failed to parse series %s: %s", target.Addr, err.Error())
continue
}

name := prompb.MetricName(ts)
if s.requestTransformer.ShouldDropMetric(ts, name) {
prompb.TimeSeriesPool.Put(ts)
metrics.MetricsDroppedTotal.WithLabelValues(string(name)).Add(1)
continue
}

for i, s := range ts.Samples {
if s.Timestamp == 0 {
s.Timestamp = scrapeTime
Expand All @@ -233,25 +239,17 @@ func (s *Scraper) scrapeTargets(ctx context.Context) {
}

if target.Namespace != "" {
ts.Labels = append(ts.Labels, prompb.Label{
Name: []byte("adxmon_namespace"),
Value: []byte(target.Namespace),
})
ts.AppendLabelString("adxmon_namespace", target.Namespace)
}

if target.Pod != "" {
ts.Labels = append(ts.Labels, prompb.Label{
Name: []byte("adxmon_pod"),
Value: []byte(target.Pod),
})
ts.AppendLabelString("adxmon_pod", target.Pod)
}

if target.Container != "" {
ts.Labels = append(ts.Labels, prompb.Label{
Name: []byte("adxmon_container"),
Value: []byte(target.Container),
})
ts.AppendLabelString("adxmon_container", target.Container)
}

prompb.Sort(ts.Labels)

ts = s.requestTransformer.TransformTimeSeries(ts)
Expand All @@ -276,18 +274,22 @@ func (s *Scraper) scrapeTargets(ctx context.Context) {
}

func (s *Scraper) flushBatchIfNecessary(ctx context.Context, wr *prompb.WriteRequest) *prompb.WriteRequest {
filtered := *wr
filtered := wr
if len(filtered.Timeseries) >= s.opts.MaxBatchSize {
filtered = s.requestTransformer.TransformWriteRequest(filtered)
filtered = s.requestTransformer.TransformWriteRequest(wr)
}

if len(filtered.Timeseries) >= s.opts.MaxBatchSize {
if err := s.sendBatch(ctx, &filtered); err != nil {
if err := s.sendBatch(ctx, filtered); err != nil {
logger.Errorf(err.Error())
}
for i := range filtered.Timeseries {
ts := filtered.Timeseries[i]
prompb.TimeSeriesPool.Put(ts)
}
filtered.Timeseries = filtered.Timeseries[:0]
}
return &filtered
return filtered
}

func (s *Scraper) sendBatch(ctx context.Context, wr *prompb.WriteRequest) error {
Expand Down Expand Up @@ -596,3 +598,5 @@ func getTargetAnnotationMapOrDefault(p *v1.Pod, key string, defaultVal map[strin
}
return parsedMap
}

var adxmonNamespaceLabel = []byte("adxmon_namespace")
12 changes: 6 additions & 6 deletions collector/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func TestScraper_sendBatch(t *testing.T) {
RemoteClient: &fakeClient{expectedSamples: 1},
},
writeRequest: &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
Timeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: []byte("testLabel"), Value: []byte("testValue")},
},
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{Value: 1, Timestamp: 123456789},
},
},
Expand All @@ -60,12 +60,12 @@ func TestScraper_sendBatch(t *testing.T) {
RemoteClient: &fakeClient{expectedSamples: 0},
},
writeRequest: &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
Timeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
Labels: []*prompb.Label{
{Name: []byte("testLabel"), Value: []byte("testValue")},
},
Samples: []prompb.Sample{
Samples: []*prompb.Sample{
{Value: 1, Timestamp: 123456789},
},
},
Expand Down
12 changes: 6 additions & 6 deletions collector/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,30 @@ type seriesCreator struct{}

func (s *seriesCreator) newSeries(name string, scrapeTarget ScrapeTarget, m *io_prometheus_client.Metric) prompb.TimeSeries {
ts := prompb.TimeSeries{
Labels: make([]prompb.Label, 0, len(m.Label)+3),
Labels: make([]*prompb.Label, 0, len(m.Label)+3),
}

ts.Labels = append(ts.Labels, prompb.Label{
ts.Labels = append(ts.Labels, &prompb.Label{
Name: []byte("__name__"),
Value: []byte(name),
})

if scrapeTarget.Namespace != "" {
ts.Labels = append(ts.Labels, prompb.Label{
ts.Labels = append(ts.Labels, &prompb.Label{
Name: []byte("adxmon_namespace"),
Value: []byte(scrapeTarget.Namespace),
})
}

if scrapeTarget.Pod != "" {
ts.Labels = append(ts.Labels, prompb.Label{
ts.Labels = append(ts.Labels, &prompb.Label{
Name: []byte("adxmon_pod"),
Value: []byte(scrapeTarget.Pod),
})
}

if scrapeTarget.Container != "" {
ts.Labels = append(ts.Labels, prompb.Label{
ts.Labels = append(ts.Labels, &prompb.Label{
Name: []byte("adxmon_container"),
Value: []byte(scrapeTarget.Container),
})
Expand All @@ -43,7 +43,7 @@ func (s *seriesCreator) newSeries(name string, scrapeTarget ScrapeTarget, m *io_
continue
}

ts.Labels = append(ts.Labels, prompb.Label{
ts.Labels = append(ts.Labels, &prompb.Label{
Name: []byte(l.GetName()),
Value: []byte(l.GetValue()),
})
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/tenebris-tech/tail v1.0.5
github.com/traefik/yaegi v0.15.1
github.com/urfave/cli/v2 v2.27.4
github.com/valyala/fastjson v1.6.4
golang.org/x/net v0.29.0
golang.org/x/sync v0.8.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ github.com/traefik/yaegi v0.15.1 h1:YA5SbaL6HZA0Exh9T/oArRHqGN2HQ+zgmCY7dkoTXu4=
github.com/traefik/yaegi v0.15.1/go.mod h1:AVRxhaI2G+nUsaM1zyktzwXn69G3t/AuTDrCiTds9p0=
github.com/urfave/cli/v2 v2.27.4 h1:o1owoI+02Eb+K107p27wEX9Bb8eqIoZCfLXloLUSWJ8=
github.com/urfave/cli/v2 v2.27.4/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ=
github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
Expand Down
6 changes: 3 additions & 3 deletions ingestor/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
v12 "k8s.io/client-go/listers/core/v1"
)

type TimeSeriesWriter func(ctx context.Context, ts []prompb.TimeSeries) error
type TimeSeriesWriter func(ctx context.Context, ts []*prompb.TimeSeries) error

type OTLPLogsWriter func(ctx context.Context, database, table string, logs *otlp.Logs) error

Expand All @@ -33,7 +33,7 @@ type Coordinator interface {
IsLeader() bool

// Write writes the time series to the correct peer.
Write(ctx context.Context, wr prompb.WriteRequest) error
Write(ctx context.Context, wr *prompb.WriteRequest) error

// WriteOTLPLogs writes the logs to the correct peer.
WriteOTLPLogs(ctx context.Context, database, table string, logs *otlp.Logs) error
Expand Down Expand Up @@ -229,7 +229,7 @@ func (c *coordinator) Close() error {
return nil
}

func (c *coordinator) Write(ctx context.Context, wr prompb.WriteRequest) error {
func (c *coordinator) Write(ctx context.Context, wr *prompb.WriteRequest) error {
return c.tsw(ctx, wr.Timeseries)
}

Expand Down
Loading

0 comments on commit 64d1ed5

Please sign in to comment.