Skip to content

Commit

Permalink
Reduce allocations/cpu from scraping
Browse files Browse the repository at this point in the history
This is a followup to the prior streaming collection PR where we
reduce the CPU and allocations incurred during scraping.

benchmark                           old ns/op     new ns/op     delta
BenchmarkIterator_TimeSeries-10     365           119           -67.39%

benchmark                           old allocs     new allocs     delta
BenchmarkIterator_TimeSeries-10     9              0              -100.00%

benchmark                           old bytes     new bytes     delta
BenchmarkIterator_TimeSeries-10     480           0             -100.00%
  • Loading branch information
jwilder committed Sep 16, 2024
1 parent 43bc752 commit f17b1f8
Show file tree
Hide file tree
Showing 9 changed files with 673 additions and 25 deletions.
18 changes: 13 additions & 5 deletions collector/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ type Scraper struct {

mu sync.RWMutex
targets map[string]ScrapeTarget

wr *prompb.WriteRequest
}

func NewScraper(opts *ScraperOpts) *Scraper {
Expand Down Expand Up @@ -213,14 +215,16 @@ func (s *Scraper) scrapeTargets(ctx context.Context) {
continue
}
for iter.Next() {
ts, err := iter.TimeSeries()
pt := prompb.TimeSeriesPool.Get().(*prompb.TimeSeries)
pt.Reset()
ts, err := iter.TimeSeriesInto(pt)
if err != nil {
logger.Errorf("Failed to get value: %s", err.Error())
continue
}

name := prompb.MetricName(ts)
if s.requestTransformer.ShouldDropMetric(ts, name) {
name := prompb.MetricName(*ts)
if s.requestTransformer.ShouldDropMetric(*ts, name) {
metrics.MetricsDroppedTotal.WithLabelValues(string(name)).Add(1)
continue
}
Expand Down Expand Up @@ -253,8 +257,8 @@ func (s *Scraper) scrapeTargets(ctx context.Context) {
}
prompb.Sort(ts.Labels)

ts = s.requestTransformer.TransformTimeSeries(ts)
wr.Timeseries = append(wr.Timeseries, ts)
tts := s.requestTransformer.TransformTimeSeries(*ts)
wr.Timeseries = append(wr.Timeseries, tts)
wr = s.flushBatchIfNecessary(ctx, wr)
}
if err := iter.Close(); err != nil {
Expand All @@ -280,6 +284,10 @@ func (s *Scraper) flushBatchIfNecessary(ctx context.Context, wr *prompb.WriteReq
if err := s.sendBatch(ctx, &filtered); err != nil {
logger.Errorf(err.Error())
}
for i := range filtered.Timeseries {
ts := &filtered.Timeseries[i]
prompb.TimeSeriesPool.Put(ts)
}
filtered.Timeseries = filtered.Timeseries[:0]
}
return &filtered
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/tenebris-tech/tail v1.0.5
github.com/traefik/yaegi v0.15.1
github.com/urfave/cli/v2 v2.27.4
github.com/valyala/fastjson v1.6.4
golang.org/x/net v0.29.0
golang.org/x/sync v0.8.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ github.com/traefik/yaegi v0.15.1 h1:YA5SbaL6HZA0Exh9T/oArRHqGN2HQ+zgmCY7dkoTXu4=
github.com/traefik/yaegi v0.15.1/go.mod h1:AVRxhaI2G+nUsaM1zyktzwXn69G3t/AuTDrCiTds9p0=
github.com/urfave/cli/v2 v2.27.4 h1:o1owoI+02Eb+K107p27wEX9Bb8eqIoZCfLXloLUSWJ8=
github.com/urfave/cli/v2 v2.27.4/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ=
github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
Expand Down
123 changes: 107 additions & 16 deletions pkg/prompb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@ import (
"strconv"
"strings"
"unicode"

"github.com/valyala/fastjson/fastfloat"
)

type Iterator struct {
r io.ReadCloser
scanner *bufio.Scanner
current string

buf []byte
}

func NewIterator(r io.ReadCloser) *Iterator {
return &Iterator{
r: r,
scanner: bufio.NewScanner(r),
buf: make([]byte, 0, 1024),
}
}

Expand All @@ -41,7 +46,14 @@ func (i *Iterator) TimeSeries() (TimeSeries, error) {
if len(i.current) == 0 {
return TimeSeries{}, fmt.Errorf("no current value")
}
return ParseTimeSeries(i.current)
return i.ParseTimeSeries(i.current)
}

func (i *Iterator) TimeSeriesInto(ts *TimeSeries) (*TimeSeries, error) {
if len(i.current) == 0 {
return nil, fmt.Errorf("no current value")
}
return i.ParseTimeSeriesInto(ts, i.current)
}

func (i *Iterator) Close() error {
Expand All @@ -60,7 +72,7 @@ func (i *Iterator) Reset(r io.ReadCloser) {

func (i *Iterator) isComment(s string) bool {
for j := 0; j < len(s); j++ {
if unicode.IsSpace(rune(s[j])) {
if isSpace(s[j]) {
continue
}

Expand All @@ -75,14 +87,66 @@ func (i *Iterator) isComment(s string) bool {

func (i *Iterator) isSpace(s string) bool {
for j := 0; j < len(s); j++ {
if !unicode.IsSpace(rune(s[j])) {
if !isSpace(s[j]) {
return false
}
}
return true
}

func ParseTimeSeries(line string) (TimeSeries, error) {
func (i *Iterator) ParseTimeSeriesInto(ts *TimeSeries, line string) (*TimeSeries, error) {
var (
name string
err error
)
name, line = parseName(line)

if cap(ts.Labels) > len(ts.Labels) {
ts.Labels = ts.Labels[:len(ts.Labels)+1]
} else {
ts.Labels = append(ts.Labels, Label{})
}

ts.Labels[len(ts.Labels)-1].Name = nameBytes

if cap(ts.Labels[len(ts.Labels)-1].Value) >= len(name) {
ts.Labels[len(ts.Labels)-1].Value = append(ts.Labels[len(ts.Labels)-1].Value[:0], name...)
} else {
ts.Labels[len(ts.Labels)-1].Value = make([]byte, len(name))
copy(ts.Labels[len(ts.Labels)-1].Value, name)
}

ts.Labels, line, err = i.parseLabels(ts.Labels, line)
if err != nil {
return nil, err
}

v, line := parseValue(line)

t, line := parseTimestamp(line)

value, err := fastfloat.Parse(v)
if err != nil {
return nil, fmt.Errorf("invalid value: %v", err)
}

var timestamp int64
if len(t) > 0 {
timestamp = fastfloat.ParseInt64BestEffort(t)
}

if cap(ts.Samples) > len(ts.Samples) {
ts.Samples = ts.Samples[:len(ts.Samples)+1]
} else {
ts.Samples = append(ts.Samples, Sample{})
}
ts.Samples[len(ts.Samples)-1].Value = value
ts.Samples[len(ts.Samples)-1].Timestamp = timestamp

return ts, nil
}

func (i *Iterator) ParseTimeSeries(line string) (TimeSeries, error) {
var (
name string
err error
Expand All @@ -95,7 +159,7 @@ func ParseTimeSeries(line string) (TimeSeries, error) {
Value: []byte(name),
})

labels, line, err = parseLabels(labels, line)
labels, line, err = i.parseLabels(labels, line)
if err != nil {
return TimeSeries{}, err
}
Expand All @@ -104,14 +168,14 @@ func ParseTimeSeries(line string) (TimeSeries, error) {

ts, line := parseTimestamp(line)

value, err := strconv.ParseFloat(string(v), 64)
value, err := strconv.ParseFloat(v, 64)
if err != nil {
return TimeSeries{}, fmt.Errorf("invalid value: %v", err)
}

var timestamp int64
if len(ts) > 0 {
timestamp, err = strconv.ParseInt(string(ts), 10, 64)
timestamp, err = strconv.ParseInt(ts, 10, 64)
if err != nil {
return TimeSeries{}, fmt.Errorf("invalid timestamp: %v", err)
}
Expand Down Expand Up @@ -148,7 +212,7 @@ func parseValue(line string) (string, string) {
return line, ""
}

func parseLabels(labels Labels, line string) (Labels, string, error) {
func (i *Iterator) parseLabels(labels Labels, line string) (Labels, string, error) {
orig := line
line = trimSpacePrefix(line)
if len(line) == 0 {
Expand All @@ -162,12 +226,33 @@ func parseLabels(labels Labels, line string) (Labels, string, error) {
return labels, line[1:], nil
}

idx := strings.Index(line, "=")
var idx = -1
for i := 0; i < len(line); i++ {
if line[i] == '=' {
idx = i
break
}
}
if idx == -1 {
return nil, "", fmt.Errorf("invalid label: no =: %s", orig)
}

var l Label
if cap(labels) > len(labels) {
labels = labels[:len(labels)+1]
} else {
labels = append(labels, Label{})
}
l = labels[len(labels)-1]

key := line[:idx]
if cap(l.Name) >= len(key) {
l.Name = append(l.Name[:0], key...)
} else {
n := make([]byte, len(key))
copy(n, key)
l.Name = n
}
line = line[idx+1:]

if len(line) == 0 {
Expand All @@ -177,7 +262,7 @@ func parseLabels(labels Labels, line string) (Labels, string, error) {
line = line[1:]
}

value := make([]byte, 0, 64)
value := i.buf[:0]
var j int
for j < len(line) {
if line[j] == '\\' {
Expand All @@ -203,10 +288,14 @@ func parseLabels(labels Labels, line string) (Labels, string, error) {
j += 1
}

labels = append(labels, Label{
Name: []byte(key),
Value: value,
})
if cap(l.Value) >= len(value) {
l.Value = append(l.Value[:0], value...)
} else {
v := make([]byte, len(value))
copy(v, value)
l.Value = v
}
labels[len(labels)-1] = l

if len(line) == 0 {
return nil, "", fmt.Errorf("invalid labels: no closing }: %s", orig)
Expand Down Expand Up @@ -236,7 +325,7 @@ func parseName(line string) (string, string) {

func trimSpacePrefix(s string) string {
for i := 0; i < len(s); i++ {
if unicode.IsSpace(rune(s[i])) {
if s[i] == ' ' || s[i] == '\t' {
continue
}
return s[i:]
Expand All @@ -245,5 +334,7 @@ func trimSpacePrefix(s string) string {
}

func isSpace(c byte) bool {
return unicode.IsSpace(rune(c))
return c == ' ' || c == '\t'
}

var nameBytes = []byte("__name__")
6 changes: 3 additions & 3 deletions pkg/prompb/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,12 @@ func BenchmarkIterator_TimeSeries(b *testing.B) {
sampleInput := `http_requests_total{method="post",code="200"} 1027 1395066363000`
iter := NewIterator(io.NopCloser(strings.NewReader(sampleInput)))
require.True(b, iter.Next())
ts := &TimeSeries{}
for i := 0; i < b.N; i++ {

_, err := iter.TimeSeries()
ts.Reset()
_, err := iter.TimeSeriesInto(ts)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
iter.Close()
}
}
8 changes: 7 additions & 1 deletion pkg/prompb/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package prompb
import (
"fmt"
mathbits "math/bits"
"sync"

"github.com/VictoriaMetrics/easyproto"
)

var (
mp = &easyproto.MarshalerPool{}
mp = &easyproto.MarshalerPool{}
TimeSeriesPool = sync.Pool{
New: func() interface{} {
return &TimeSeries{}
},
}
)

// WriteRequest represents Prometheus remote write API request
Expand Down
22 changes: 22 additions & 0 deletions vendor/github.com/valyala/fastjson/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f17b1f8

Please sign in to comment.