From 64d1ed53bb21ab7760831fd7d2f7933aabe61ef1 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 16 Sep 2024 12:32:53 -0600 Subject: [PATCH] Reduce allocations/cpu from scraping This is a followup to the prior streaming collection PR where we reduce the CPU and allocations incurred during scraping. benchmark old ns/op new ns/op delta BenchmarkIterator_TimeSeries-10 365 119 -67.39% benchmark old allocs new allocs delta BenchmarkIterator_TimeSeries-10 9 0 -100.00% benchmark old bytes new bytes delta BenchmarkIterator_TimeSeries-10 480 0 -100.00% --- collector/otlp/metrics.go | 42 +- collector/scraper.go | 40 +- collector/scraper_test.go | 12 +- collector/series.go | 12 +- go.mod | 1 + go.sum | 2 + ingestor/cluster/coordinator.go | 6 +- ingestor/metrics/handler.go | 8 +- ingestor/metrics/handler_test.go | 30 +- ingestor/service_test.go | 2 +- ingestor/storage/store.go | 6 +- ingestor/storage/store_test.go | 22 +- ingestor/transform/csv.go | 2 +- ingestor/transform/csv_test.go | 18 +- ingestor/transform/transformer.go | 30 +- ingestor/transform/transformer_test.go | 130 ++--- pkg/prompb/iterator.go | 134 +++-- pkg/prompb/iterator_test.go | 146 ++--- pkg/prompb/protobuf.go | 89 ++- pkg/prompb/protobuf_test.go | 12 +- pkg/prompb/sort.go | 4 +- pkg/prompb/sort_test.go | 4 +- pkg/prompb/util.go | 2 +- pkg/promremote/proxy.go | 20 +- tools/data/metric.go | 50 +- vendor/github.com/valyala/fastjson/LICENSE | 22 + .../valyala/fastjson/fastfloat/parse.go | 515 ++++++++++++++++++ vendor/modules.txt | 3 + 28 files changed, 1019 insertions(+), 345 deletions(-) create mode 100644 vendor/github.com/valyala/fastjson/LICENSE create mode 100644 vendor/github.com/valyala/fastjson/fastfloat/parse.go diff --git a/collector/otlp/metrics.go b/collector/otlp/metrics.go index 007dd77d..816a797f 100644 --- a/collector/otlp/metrics.go +++ b/collector/otlp/metrics.go @@ -164,7 +164,7 @@ func (t *OltpMetricWriter) Write(ctx context.Context, msg *v1.ExportMetricsServi return nil } -func (t *OltpMetricWriter) addSeriesAndFlushIfNecessary(ctx context.Context, wr *prompb.WriteRequest, series prompb.TimeSeries) error { +func (t *OltpMetricWriter) addSeriesAndFlushIfNecessary(ctx context.Context, wr *prompb.WriteRequest, series *prompb.TimeSeries) error { series = t.requestTransformer.TransformTimeSeries(series) wr.Timeseries = append(wr.Timeseries, series) if len(wr.Timeseries) >= t.maxBatchSize { @@ -226,7 +226,7 @@ func (t *OltpMetricWriter) sendBatch(ctx context.Context, wr *prompb.WriteReques func (t *OltpMetricWriter) addOltpNumberPoints(ctx context.Context, name string, points []*metricsv1.NumberDataPoint, wr *prompb.WriteRequest) error { for _, point := range points { series := newSeries(name, point.Attributes) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: unixNanoToUnixMillis(point.TimeUnixNano), Value: asFloat64(point), @@ -245,7 +245,7 @@ func (t *OltpMetricWriter) addOltpHistogramPoints(ctx context.Context, name stri // Add count series series := newSeries(fmt.Sprintf("%s_count", name), point.Attributes) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: timestamp, Value: float64(point.Count), @@ -258,7 +258,7 @@ func (t *OltpMetricWriter) addOltpHistogramPoints(ctx context.Context, name stri // Add sum series, if present if point.Sum != nil { series := newSeries(fmt.Sprintf("%s_sum", name), point.Attributes) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: timestamp, Value: float64(*point.Sum), @@ -282,11 +282,11 @@ func (t *OltpMetricWriter) addOltpHistogramPoints(ctx context.Context, name stri } series := newSeries(fmt.Sprintf("%s_bucket", name), point.Attributes) - series.Labels = append(series.Labels, prompb.Label{ + series.Labels = append(series.Labels, &prompb.Label{ Name: []byte("le"), Value: []byte(upperBound), }) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: timestamp, Value: float64(point.BucketCounts[i]), @@ -312,7 +312,7 @@ func (t *OltpMetricWriter) addOltpExpHistogramPoints(ctx context.Context, name s // Add count series series := newSeries(fmt.Sprintf("%s_count", name), point.Attributes) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: timestamp, Value: float64(point.Count), @@ -325,7 +325,7 @@ func (t *OltpMetricWriter) addOltpExpHistogramPoints(ctx context.Context, name s // Add sum series, if present if point.Sum != nil { series := newSeries(fmt.Sprintf("%s_sum", name), point.Attributes) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: timestamp, Value: float64(*point.Sum), @@ -349,11 +349,11 @@ func (t *OltpMetricWriter) addOltpExpHistogramPoints(ctx context.Context, name s upperBound := fmt.Sprintf("%f", math.Pow(-base, float64(bucketIdx))) series := newSeries(fmt.Sprintf("%s_bucket", name), point.Attributes) - series.Labels = append(series.Labels, prompb.Label{ + series.Labels = append(series.Labels, &prompb.Label{ Name: []byte("le"), Value: []byte(upperBound), }) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: timestamp, Value: float64(buckets.BucketCounts[i]), @@ -376,11 +376,11 @@ func (t *OltpMetricWriter) addOltpExpHistogramPoints(ctx context.Context, name s upperBound := fmt.Sprintf("%f", math.Pow(base, float64(bucketIdx+1))) series := newSeries(fmt.Sprintf("%s_bucket", name), point.Attributes) - series.Labels = append(series.Labels, prompb.Label{ + series.Labels = append(series.Labels, &prompb.Label{ Name: []byte("le"), Value: []byte(upperBound), }) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: timestamp, Value: float64(buckets.BucketCounts[i]), @@ -401,7 +401,7 @@ func (t *OltpMetricWriter) addOltpSummaryPoints(ctx context.Context, name string // Add count series series := newSeries(fmt.Sprintf("%s_count", name), point.Attributes) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: timestamp, Value: float64(point.Count), @@ -412,7 +412,7 @@ func (t *OltpMetricWriter) addOltpSummaryPoints(ctx context.Context, name string } series = newSeries(fmt.Sprintf("%s_sum", name), point.Attributes) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: timestamp, Value: point.Sum, @@ -425,11 +425,11 @@ func (t *OltpMetricWriter) addOltpSummaryPoints(ctx context.Context, name string // Add quantile series for _, quantile := range point.QuantileValues { series := newSeries(name, point.Attributes) - series.Labels = append(series.Labels, prompb.Label{ + series.Labels = append(series.Labels, &prompb.Label{ Name: []byte("quantile"), Value: []byte(fmt.Sprintf("%f", quantile.Quantile)), }) - series.Samples = []prompb.Sample{ + series.Samples = []*prompb.Sample{ { Timestamp: timestamp, Value: quantile.Value, @@ -443,18 +443,18 @@ func (t *OltpMetricWriter) addOltpSummaryPoints(ctx context.Context, name string return nil } -func newSeries(name string, attributes []*commonv1.KeyValue) prompb.TimeSeries { - ts := prompb.TimeSeries{ - Labels: make([]prompb.Label, 0, len(attributes)+1), +func newSeries(name string, attributes []*commonv1.KeyValue) *prompb.TimeSeries { + ts := &prompb.TimeSeries{ + Labels: make([]*prompb.Label, 0, len(attributes)+1), } - ts.Labels = append(ts.Labels, prompb.Label{ + ts.Labels = append(ts.Labels, &prompb.Label{ Name: []byte("__name__"), Value: []byte(name), }) for _, l := range attributes { - ts.Labels = append(ts.Labels, prompb.Label{ + ts.Labels = append(ts.Labels, &prompb.Label{ Name: []byte(l.Key), Value: []byte(l.Value.String()), }) diff --git a/collector/scraper.go b/collector/scraper.go index 8a27f9b4..01bf652f 100644 --- a/collector/scraper.go +++ b/collector/scraper.go @@ -122,6 +122,8 @@ type Scraper struct { mu sync.RWMutex targets map[string]ScrapeTarget + + wr *prompb.WriteRequest } func NewScraper(opts *ScraperOpts) *Scraper { @@ -214,17 +216,21 @@ func (s *Scraper) scrapeTargets(ctx context.Context) { continue } for iter.Next() { - ts, err := iter.TimeSeries() + pt := prompb.TimeSeriesPool.Get().(*prompb.TimeSeries) + pt.Reset() + ts, err := iter.TimeSeriesInto(pt) if err != nil { - logger.Errorf("Failed to get value: %s", err.Error()) + logger.Errorf("Failed to parse series %s: %s", target.Addr, err.Error()) continue } name := prompb.MetricName(ts) if s.requestTransformer.ShouldDropMetric(ts, name) { + prompb.TimeSeriesPool.Put(ts) metrics.MetricsDroppedTotal.WithLabelValues(string(name)).Add(1) continue } + for i, s := range ts.Samples { if s.Timestamp == 0 { s.Timestamp = scrapeTime @@ -233,25 +239,17 @@ func (s *Scraper) scrapeTargets(ctx context.Context) { } if target.Namespace != "" { - ts.Labels = append(ts.Labels, prompb.Label{ - Name: []byte("adxmon_namespace"), - Value: []byte(target.Namespace), - }) + ts.AppendLabelString("adxmon_namespace", target.Namespace) } if target.Pod != "" { - ts.Labels = append(ts.Labels, prompb.Label{ - Name: []byte("adxmon_pod"), - Value: []byte(target.Pod), - }) + ts.AppendLabelString("adxmon_pod", target.Pod) } if target.Container != "" { - ts.Labels = append(ts.Labels, prompb.Label{ - Name: []byte("adxmon_container"), - Value: []byte(target.Container), - }) + ts.AppendLabelString("adxmon_container", target.Container) } + prompb.Sort(ts.Labels) ts = s.requestTransformer.TransformTimeSeries(ts) @@ -276,18 +274,22 @@ func (s *Scraper) scrapeTargets(ctx context.Context) { } func (s *Scraper) flushBatchIfNecessary(ctx context.Context, wr *prompb.WriteRequest) *prompb.WriteRequest { - filtered := *wr + filtered := wr if len(filtered.Timeseries) >= s.opts.MaxBatchSize { - filtered = s.requestTransformer.TransformWriteRequest(filtered) + filtered = s.requestTransformer.TransformWriteRequest(wr) } if len(filtered.Timeseries) >= s.opts.MaxBatchSize { - if err := s.sendBatch(ctx, &filtered); err != nil { + if err := s.sendBatch(ctx, filtered); err != nil { logger.Errorf(err.Error()) } + for i := range filtered.Timeseries { + ts := filtered.Timeseries[i] + prompb.TimeSeriesPool.Put(ts) + } filtered.Timeseries = filtered.Timeseries[:0] } - return &filtered + return filtered } func (s *Scraper) sendBatch(ctx context.Context, wr *prompb.WriteRequest) error { @@ -596,3 +598,5 @@ func getTargetAnnotationMapOrDefault(p *v1.Pod, key string, defaultVal map[strin } return parsedMap } + +var adxmonNamespaceLabel = []byte("adxmon_namespace") diff --git a/collector/scraper_test.go b/collector/scraper_test.go index 07189fb3..b1c70868 100644 --- a/collector/scraper_test.go +++ b/collector/scraper_test.go @@ -40,12 +40,12 @@ func TestScraper_sendBatch(t *testing.T) { RemoteClient: &fakeClient{expectedSamples: 1}, }, writeRequest: &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ {Name: []byte("testLabel"), Value: []byte("testValue")}, }, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ {Value: 1, Timestamp: 123456789}, }, }, @@ -60,12 +60,12 @@ func TestScraper_sendBatch(t *testing.T) { RemoteClient: &fakeClient{expectedSamples: 0}, }, writeRequest: &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ {Name: []byte("testLabel"), Value: []byte("testValue")}, }, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ {Value: 1, Timestamp: 123456789}, }, }, diff --git a/collector/series.go b/collector/series.go index 3f22cc38..f7292f1a 100644 --- a/collector/series.go +++ b/collector/series.go @@ -9,30 +9,30 @@ type seriesCreator struct{} func (s *seriesCreator) newSeries(name string, scrapeTarget ScrapeTarget, m *io_prometheus_client.Metric) prompb.TimeSeries { ts := prompb.TimeSeries{ - Labels: make([]prompb.Label, 0, len(m.Label)+3), + Labels: make([]*prompb.Label, 0, len(m.Label)+3), } - ts.Labels = append(ts.Labels, prompb.Label{ + ts.Labels = append(ts.Labels, &prompb.Label{ Name: []byte("__name__"), Value: []byte(name), }) if scrapeTarget.Namespace != "" { - ts.Labels = append(ts.Labels, prompb.Label{ + ts.Labels = append(ts.Labels, &prompb.Label{ Name: []byte("adxmon_namespace"), Value: []byte(scrapeTarget.Namespace), }) } if scrapeTarget.Pod != "" { - ts.Labels = append(ts.Labels, prompb.Label{ + ts.Labels = append(ts.Labels, &prompb.Label{ Name: []byte("adxmon_pod"), Value: []byte(scrapeTarget.Pod), }) } if scrapeTarget.Container != "" { - ts.Labels = append(ts.Labels, prompb.Label{ + ts.Labels = append(ts.Labels, &prompb.Label{ Name: []byte("adxmon_container"), Value: []byte(scrapeTarget.Container), }) @@ -43,7 +43,7 @@ func (s *seriesCreator) newSeries(name string, scrapeTarget ScrapeTarget, m *io_ continue } - ts.Labels = append(ts.Labels, prompb.Label{ + ts.Labels = append(ts.Labels, &prompb.Label{ Name: []byte(l.GetName()), Value: []byte(l.GetValue()), }) diff --git a/go.mod b/go.mod index 8b1eaef0..3424ab49 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/tenebris-tech/tail v1.0.5 github.com/traefik/yaegi v0.15.1 github.com/urfave/cli/v2 v2.27.4 + github.com/valyala/fastjson v1.6.4 golang.org/x/net v0.29.0 golang.org/x/sync v0.8.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 diff --git a/go.sum b/go.sum index 3080cd5e..8075924e 100644 --- a/go.sum +++ b/go.sum @@ -201,6 +201,8 @@ github.com/traefik/yaegi v0.15.1 h1:YA5SbaL6HZA0Exh9T/oArRHqGN2HQ+zgmCY7dkoTXu4= github.com/traefik/yaegi v0.15.1/go.mod h1:AVRxhaI2G+nUsaM1zyktzwXn69G3t/AuTDrCiTds9p0= github.com/urfave/cli/v2 v2.27.4 h1:o1owoI+02Eb+K107p27wEX9Bb8eqIoZCfLXloLUSWJ8= github.com/urfave/cli/v2 v2.27.4/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ= +github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ= +github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= diff --git a/ingestor/cluster/coordinator.go b/ingestor/cluster/coordinator.go index 1c42c195..e2fc84d6 100644 --- a/ingestor/cluster/coordinator.go +++ b/ingestor/cluster/coordinator.go @@ -21,7 +21,7 @@ import ( v12 "k8s.io/client-go/listers/core/v1" ) -type TimeSeriesWriter func(ctx context.Context, ts []prompb.TimeSeries) error +type TimeSeriesWriter func(ctx context.Context, ts []*prompb.TimeSeries) error type OTLPLogsWriter func(ctx context.Context, database, table string, logs *otlp.Logs) error @@ -33,7 +33,7 @@ type Coordinator interface { IsLeader() bool // Write writes the time series to the correct peer. - Write(ctx context.Context, wr prompb.WriteRequest) error + Write(ctx context.Context, wr *prompb.WriteRequest) error // WriteOTLPLogs writes the logs to the correct peer. WriteOTLPLogs(ctx context.Context, database, table string, logs *otlp.Logs) error @@ -229,7 +229,7 @@ func (c *coordinator) Close() error { return nil } -func (c *coordinator) Write(ctx context.Context, wr prompb.WriteRequest) error { +func (c *coordinator) Write(ctx context.Context, wr *prompb.WriteRequest) error { return c.tsw(ctx, wr.Timeseries) } diff --git a/ingestor/metrics/handler.go b/ingestor/metrics/handler.go index a833cffb..a71e1929 100644 --- a/ingestor/metrics/handler.go +++ b/ingestor/metrics/handler.go @@ -25,7 +25,7 @@ type SeriesCounter interface { type RequestWriter interface { // Write writes the time series to the correct peer. - Write(ctx context.Context, wr prompb.WriteRequest) error + Write(ctx context.Context, wr *prompb.WriteRequest) error } type HealthChecker interface { @@ -37,7 +37,7 @@ type HandlerOpts struct { Path string RequestTransformer interface { - TransformWriteRequest(req prompb.WriteRequest) prompb.WriteRequest + TransformWriteRequest(req *prompb.WriteRequest) *prompb.WriteRequest } // RequestWriter is the interface that writes the time series to a destination. @@ -61,7 +61,7 @@ type Handler struct { DropMetrics []*regexp.Regexp requestTransformer interface { - TransformWriteRequest(req prompb.WriteRequest) prompb.WriteRequest + TransformWriteRequest(req *prompb.WriteRequest) *prompb.WriteRequest } requestWriter RequestWriter @@ -134,7 +134,7 @@ func (s *Handler) HandleReceive(w http.ResponseWriter, r *http.Request) { } // Note: this cause allocations, but holding onto them in a pool causes a lot of memory to be used over time. - var req prompb.WriteRequest + req := &prompb.WriteRequest{} if err := req.Unmarshal(reqBuf); err != nil { m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc() http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/ingestor/metrics/handler_test.go b/ingestor/metrics/handler_test.go index 426fb326..ce4db1df 100644 --- a/ingestor/metrics/handler_test.go +++ b/ingestor/metrics/handler_test.go @@ -14,17 +14,17 @@ import ( ) type fakeRequestWriter struct { - fn func(ctx context.Context, wr prompb.WriteRequest) error + fn func(ctx context.Context, wr *prompb.WriteRequest) error } -func (f *fakeRequestWriter) Write(ctx context.Context, wr prompb.WriteRequest) error { +func (f *fakeRequestWriter) Write(ctx context.Context, wr *prompb.WriteRequest) error { return f.fn(ctx, wr) } func TestHandler_HandleReceive(t *testing.T) { var called bool writer := &fakeRequestWriter{ - fn: func(ctx context.Context, wr prompb.WriteRequest) error { + fn: func(ctx context.Context, wr *prompb.WriteRequest) error { require.Equal(t, 1, len(wr.Timeseries)) called = true return nil @@ -38,16 +38,16 @@ func TestHandler_HandleReceive(t *testing.T) { Database: "adxmetrics", }) - wr := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + wr := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("foo"), Value: []byte("bar"), }, }, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ { Value: 1, Timestamp: 1, @@ -76,7 +76,7 @@ func TestHandler_HandleReceive(t *testing.T) { func TestHandler_HandleReceive_Unhealthy(t *testing.T) { var called bool writer := &fakeRequestWriter{ - fn: func(ctx context.Context, wr prompb.WriteRequest) error { + fn: func(ctx context.Context, wr *prompb.WriteRequest) error { require.Equal(t, 1, len(wr.Timeseries)) called = true return nil @@ -93,15 +93,15 @@ func TestHandler_HandleReceive_Unhealthy(t *testing.T) { }) wr := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("foo"), Value: []byte("bar"), }, }, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ { Value: 1, Timestamp: 1, @@ -130,7 +130,7 @@ func TestHandler_HandleReceive_Unhealthy(t *testing.T) { func TestHandler_HandleReceive_AllowedDBs(t *testing.T) { var called bool writer := &fakeRequestWriter{ - fn: func(ctx context.Context, wr prompb.WriteRequest) error { + fn: func(ctx context.Context, wr *prompb.WriteRequest) error { require.Equal(t, 0, len(wr.Timeseries)) called = true return nil @@ -147,15 +147,15 @@ func TestHandler_HandleReceive_AllowedDBs(t *testing.T) { }) wr := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("foo"), Value: []byte("bar"), }, }, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ { Value: 1, Timestamp: 1, diff --git a/ingestor/service_test.go b/ingestor/service_test.go index 8c5719ea..e95eaf56 100644 --- a/ingestor/service_test.go +++ b/ingestor/service_test.go @@ -93,7 +93,7 @@ func (f fakeStore) Close() error { panic("implement me") } -func (f fakeStore) WriteTimeSeries(ctx context.Context, ts []prompb.TimeSeries) error { +func (f fakeStore) WriteTimeSeries(ctx context.Context, ts []*prompb.TimeSeries) error { panic("implement me") } diff --git a/ingestor/storage/store.go b/ingestor/storage/store.go index a57eec42..ad5a09dc 100644 --- a/ingestor/storage/store.go +++ b/ingestor/storage/store.go @@ -36,7 +36,7 @@ type Store interface { service.Component // WriteTimeSeries writes a batch of time series to the Store. - WriteTimeSeries(ctx context.Context, ts []prompb.TimeSeries) error + WriteTimeSeries(ctx context.Context, ts []*prompb.TimeSeries) error // WriteOTLPLogs writes a batch of logs to the Store. WriteOTLPLogs(ctx context.Context, database, table string, logs *otlp.Logs) error @@ -97,7 +97,7 @@ func (s *LocalStore) WALCount() int { return s.repository.Count() } -func (s *LocalStore) WriteTimeSeries(ctx context.Context, ts []prompb.TimeSeries) error { +func (s *LocalStore) WriteTimeSeries(ctx context.Context, ts []*prompb.TimeSeries) error { enc := csvWriterPool.Get(8 * 1024).(*transform.CSVWriter) defer csvWriterPool.Put(enc) enc.InitColumns(s.opts.LiftedColumns) @@ -303,7 +303,7 @@ func (s *LocalStore) Index() *wal.Index { return s.repository.Index() } -func SegmentKey(dst []byte, labels []prompb.Label) ([]byte, error) { +func SegmentKey(dst []byte, labels []*prompb.Label) ([]byte, error) { var name, database []byte for _, v := range labels { if bytes.Equal(v.Name, []byte("adxmon_database")) { diff --git a/ingestor/storage/store_test.go b/ingestor/storage/store_test.go index 11672f14..b80915a0 100644 --- a/ingestor/storage/store_test.go +++ b/ingestor/storage/store_test.go @@ -26,7 +26,7 @@ import ( func TestSeriesKey(t *testing.T) { tests := []struct { Database []byte - Labels []prompb.Label + Labels []*prompb.Label Expect []byte }{ { @@ -68,21 +68,21 @@ func TestStore_Open(t *testing.T) { w, err := s.GetWAL(ctx, key) require.NoError(t, err) require.NotNil(t, w) - require.NoError(t, s.WriteTimeSeries(context.Background(), []prompb.TimeSeries{ts})) + require.NoError(t, s.WriteTimeSeries(context.Background(), []*prompb.TimeSeries{ts})) ts = newTimeSeries("foo", map[string]string{"adxmon_database": database}, 1, 1) key1, err := storage.SegmentKey(b[:0], ts.Labels) w, err = s.GetWAL(ctx, key1) require.NoError(t, err) require.NotNil(t, w) - require.NoError(t, s.WriteTimeSeries(context.Background(), []prompb.TimeSeries{ts})) + require.NoError(t, s.WriteTimeSeries(context.Background(), []*prompb.TimeSeries{ts})) ts = newTimeSeries("bar", map[string]string{"adxmon_database": database}, 0, 0) key2, err := storage.SegmentKey(b[:0], ts.Labels) w, err = s.GetWAL(ctx, key2) require.NoError(t, err) require.NotNil(t, w) - require.NoError(t, s.WriteTimeSeries(context.Background(), []prompb.TimeSeries{ts})) + require.NoError(t, s.WriteTimeSeries(context.Background(), []*prompb.TimeSeries{ts})) path := w.Path() @@ -126,7 +126,7 @@ func TestStore_WriteTimeSeries(t *testing.T) { w, err := s.GetWAL(ctx, key) require.NoError(t, err) require.NotNil(t, w) - require.NoError(t, s.WriteTimeSeries(context.Background(), []prompb.TimeSeries{ts})) + require.NoError(t, s.WriteTimeSeries(context.Background(), []*prompb.TimeSeries{ts})) path := w.Path() @@ -364,7 +364,7 @@ func BenchmarkWriteTimeSeries(b *testing.B) { defer s.Close() require.Equal(b, 0, s.WALCount()) - batch := make([]prompb.TimeSeries, 2500) + batch := make([]*prompb.TimeSeries, 2500) for i := 0; i < 2500; i++ { batch[i] = newTimeSeries(fmt.Sprintf("metric%d", i%100), nil, 0, 0) } @@ -373,24 +373,24 @@ func BenchmarkWriteTimeSeries(b *testing.B) { } } -func newTimeSeries(name string, labels map[string]string, ts int64, val float64) prompb.TimeSeries { - l := []prompb.Label{ +func newTimeSeries(name string, labels map[string]string, ts int64, val float64) *prompb.TimeSeries { + l := []*prompb.Label{ { Name: []byte("__name__"), Value: []byte(name), }, } for k, v := range labels { - l = append(l, prompb.Label{Name: []byte(k), Value: []byte(v)}) + l = append(l, &prompb.Label{Name: []byte(k), Value: []byte(v)}) } sort.Slice(l, func(i, j int) bool { return bytes.Compare(l[i].Name, l[j].Name) < 0 }) - return prompb.TimeSeries{ + return &prompb.TimeSeries{ Labels: l, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ { Timestamp: ts, Value: val, diff --git a/ingestor/transform/csv.go b/ingestor/transform/csv.go index 74979640..318a9101 100644 --- a/ingestor/transform/csv.go +++ b/ingestor/transform/csv.go @@ -57,7 +57,7 @@ func NewCSVWriter(w *bytes.Buffer, columns []string) *CSVWriter { return writer } -func (w *CSVWriter) MarshalTS(ts prompb.TimeSeries) error { +func (w *CSVWriter) MarshalTS(ts *prompb.TimeSeries) error { buf := w.labelsBuf buf.Reset() diff --git a/ingestor/transform/csv_test.go b/ingestor/transform/csv_test.go index d3853197..ac53d1ce 100644 --- a/ingestor/transform/csv_test.go +++ b/ingestor/transform/csv_test.go @@ -17,8 +17,8 @@ import ( ) func TestMarshalCSV(t *testing.T) { - ts := prompb.TimeSeries{ - Labels: []prompb.Label{ + ts := &prompb.TimeSeries{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("__redis__"), @@ -37,7 +37,7 @@ func TestMarshalCSV(t *testing.T) { }, }, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ { Timestamp: 1669112524001, Value: 0, @@ -65,8 +65,8 @@ func TestMarshalCSV(t *testing.T) { } func BenchmarkMarshalCSV(b *testing.B) { - ts := prompb.TimeSeries{ - Labels: []prompb.Label{ + ts := &prompb.TimeSeries{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("__redis__"), @@ -85,7 +85,7 @@ func BenchmarkMarshalCSV(b *testing.B) { }, }, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ { Timestamp: 1669112524001, Value: 0, @@ -112,8 +112,8 @@ func BenchmarkMarshalCSV(b *testing.B) { } func TestMarshalCSV_LiftLabel(t *testing.T) { - ts := prompb.TimeSeries{ - Labels: []prompb.Label{ + ts := &prompb.TimeSeries{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("__redis__"), @@ -132,7 +132,7 @@ func TestMarshalCSV_LiftLabel(t *testing.T) { }, }, - Samples: []prompb.Sample{ + Samples: []*prompb.Sample{ { Timestamp: 1669112524001, Value: 0, diff --git a/ingestor/transform/transformer.go b/ingestor/transform/transformer.go index f8f18331..5244d944 100644 --- a/ingestor/transform/transformer.go +++ b/ingestor/transform/transformer.go @@ -4,11 +4,11 @@ import ( "bytes" "fmt" "regexp" - "sort" "sync" "github.com/Azure/adx-mon/metrics" "github.com/Azure/adx-mon/pkg/prompb" + "github.com/davecgh/go-spew/spew" ) type RequestTransformer struct { @@ -34,7 +34,7 @@ type RequestTransformer struct { // AddLabels is a map of label names to label values that will be added to all metrics. AddLabels map[string]string - addLabels []prompb.Label + addLabels []*prompb.Label // AllowedDatabase is a map of database names that are allowed to be written to. AllowedDatabase map[string]struct{} @@ -44,7 +44,7 @@ type RequestTransformer struct { func (f *RequestTransformer) init() { f.initOnce.Do(func() { - addLabelsSlice := make([]prompb.Label, 0, len(f.AddLabels)) + addLabelsSlice := make([]*prompb.Label, 0, len(f.AddLabels)) if f.DropLabels == nil { f.DropLabels = make(map[*regexp.Regexp]*regexp.Regexp) } @@ -54,7 +54,7 @@ func (f *RequestTransformer) init() { } for k, v := range f.AddLabels { - addLabelsSlice = append(addLabelsSlice, prompb.Label{ + addLabelsSlice = append(addLabelsSlice, &prompb.Label{ Name: []byte(k), Value: []byte(v), }) @@ -65,7 +65,7 @@ func (f *RequestTransformer) init() { }) } -func (f *RequestTransformer) TransformWriteRequest(req prompb.WriteRequest) prompb.WriteRequest { +func (f *RequestTransformer) TransformWriteRequest(req *prompb.WriteRequest) *prompb.WriteRequest { f.init() var i int for j := range req.Timeseries { @@ -101,7 +101,14 @@ func (f *RequestTransformer) TransformWriteRequest(req prompb.WriteRequest) prom return req } -func (f *RequestTransformer) TransformTimeSeries(v prompb.TimeSeries) prompb.TimeSeries { +func (f *RequestTransformer) TransformTimeSeries(v *prompb.TimeSeries) *prompb.TimeSeries { + for _, l := range v.Labels { + if len(l.Name) == 0 { + spew.Dump(v) + panic("empty label name transform") + } + } + f.init() // If labels are configured to be dropped, filter them next. var ( @@ -146,14 +153,19 @@ func (f *RequestTransformer) TransformTimeSeries(v prompb.TimeSeries) prompb.Tim i++ } v.Labels = v.Labels[:i] - v.Labels = append(v.Labels, f.addLabels...) + for _, ll := range f.addLabels { + l := &prompb.Label{} + l.Name = append(l.Name[:0], ll.Name...) + l.Value = append(l.Value[:0], ll.Value...) + v.Labels = append(v.Labels, l) + } - sort.Sort(prompb.Labels(v.Labels)) + prompb.Sort(v.Labels) return v } -func (f *RequestTransformer) ShouldDropMetric(v prompb.TimeSeries, name []byte) bool { +func (f *RequestTransformer) ShouldDropMetric(v *prompb.TimeSeries, name []byte) bool { if f.DefaultDropMetrics { // Explicitly dropped metrics take precedence over explicitly kept metrics. for _, r := range f.DropMetrics { diff --git a/ingestor/transform/transformer_test.go b/ingestor/transform/transformer_test.go index 29d9c032..9dafa9aa 100644 --- a/ingestor/transform/transformer_test.go +++ b/ingestor/transform/transformer_test.go @@ -14,10 +14,10 @@ func TestRequestTransformer_TransformWriteRequest_DropMetrics(t *testing.T) { regexp.MustCompile("cpu"), }} - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -25,7 +25,7 @@ func TestRequestTransformer_TransformWriteRequest_DropMetrics(t *testing.T) { }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -44,10 +44,10 @@ func TestRequestTransformer_TransformWriteRequest_DropMetricsRegex(t *testing.T) regexp.MustCompile("cpu|mem"), }} - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -55,7 +55,7 @@ func TestRequestTransformer_TransformWriteRequest_DropMetricsRegex(t *testing.T) }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -63,7 +63,7 @@ func TestRequestTransformer_TransformWriteRequest_DropMetricsRegex(t *testing.T) }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("net"), @@ -84,10 +84,10 @@ func TestRequestTransformer_TransformWriteRequest_DropLabels(t *testing.T) { }, } - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -99,7 +99,7 @@ func TestRequestTransformer_TransformWriteRequest_DropLabels(t *testing.T) { }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -134,10 +134,10 @@ func TestRequestTransformer_TransformWriteRequest_SkipNameLabel(t *testing.T) { }, } - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -149,7 +149,7 @@ func TestRequestTransformer_TransformWriteRequest_SkipNameLabel(t *testing.T) { }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -189,8 +189,8 @@ func TestRequestTransformer_TransformTimeSeries_AddLabels(t *testing.T) { }, } - ts := prompb.TimeSeries{ - Labels: []prompb.Label{ + ts := &prompb.TimeSeries{ + Labels: []*prompb.Label{ { Name: []byte("region"), Value: []byte("eastus"), @@ -223,10 +223,10 @@ func TestRequestTransformer_TransformWriteRequest_AllowedDatabases(t *testing.T) AllowedDatabase: map[string]struct{}{"foo": {}}, } - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -234,7 +234,7 @@ func TestRequestTransformer_TransformWriteRequest_AllowedDatabases(t *testing.T) }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -255,10 +255,10 @@ func TestRequestTransformer_TransformWriteRequest_AllowedDatabases(t *testing.T) func TestRequestTransformer_TransformWriteRequest_DefaultDropMetrics(t *testing.T) { f := &transform.RequestTransformer{DefaultDropMetrics: true} - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -266,7 +266,7 @@ func TestRequestTransformer_TransformWriteRequest_DefaultDropMetrics(t *testing. }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -274,7 +274,7 @@ func TestRequestTransformer_TransformWriteRequest_DefaultDropMetrics(t *testing. }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("net"), @@ -294,10 +294,10 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetrics(t *testing.T) { KeepMetrics: []*regexp.Regexp{regexp.MustCompile("cpu|mem")}, } - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -305,7 +305,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetrics(t *testing.T) { }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -313,7 +313,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetrics(t *testing.T) { }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("net"), @@ -341,10 +341,10 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsAndDrop(t *testing. DropMetrics: []*regexp.Regexp{regexp.MustCompile("^mem_load")}, } - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -352,7 +352,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsAndDrop(t *testing. }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -360,7 +360,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsAndDrop(t *testing. }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("net"), @@ -368,7 +368,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsAndDrop(t *testing. }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem_load"), @@ -394,10 +394,10 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsWithLabelValue(t *t }, } - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -409,7 +409,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsWithLabelValue(t *t }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -421,7 +421,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsWithLabelValue(t *t }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("net"), @@ -447,10 +447,10 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsAndDropLabelValue(t DropMetrics: []*regexp.Regexp{regexp.MustCompile("mem")}, } - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -462,7 +462,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsAndDropLabelValue(t }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -474,7 +474,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsAndDropLabelValue(t }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("disk"), @@ -501,10 +501,10 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsWithLabelValueDropL }, } - req := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ + req := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -520,7 +520,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsWithLabelValueDropL }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("mem"), @@ -532,7 +532,7 @@ func TestRequestTransformer_TransformWriteRequest_KeepMetricsWithLabelValueDropL }, }, { - Labels: []prompb.Label{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("net"), @@ -573,8 +573,8 @@ func BenchmarkRequestTransformer_TransformWriteRequest(b *testing.B) { "adxmon_container": "container", }} - ts := prompb.TimeSeries{ - Labels: []prompb.Label{ + ts := &prompb.TimeSeries{ + Labels: []*prompb.Label{ { Name: []byte("__name__"), Value: []byte("cpu"), @@ -582,8 +582,8 @@ func BenchmarkRequestTransformer_TransformWriteRequest(b *testing.B) { }, } - wr := prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ts}, + wr := &prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ts}, } b.ResetTimer() @@ -594,7 +594,7 @@ func BenchmarkRequestTransformer_TransformWriteRequest(b *testing.B) { func TestRequestTransformer_ShouldDropMetric(t *testing.T) { type args struct { - v prompb.TimeSeries + v *prompb.TimeSeries name []byte } tests := []struct { @@ -611,7 +611,7 @@ func TestRequestTransformer_ShouldDropMetric(t *testing.T) { }, args: args{ name: []byte("metric"), - v: prompb.TimeSeries{}, + v: &prompb.TimeSeries{}, }, want: false, }, @@ -624,7 +624,7 @@ func TestRequestTransformer_ShouldDropMetric(t *testing.T) { }, args: args{ name: []byte("metric"), - v: prompb.TimeSeries{}, + v: &prompb.TimeSeries{}, }, want: true, }, @@ -638,7 +638,7 @@ func TestRequestTransformer_ShouldDropMetric(t *testing.T) { }, args: args{ name: []byte("metric"), - v: prompb.TimeSeries{Labels: []prompb.Label{ + v: &prompb.TimeSeries{Labels: []*prompb.Label{ {Name: []byte("labelname"), Value: []byte("value")}, }}, }, @@ -651,7 +651,7 @@ func TestRequestTransformer_ShouldDropMetric(t *testing.T) { }, args: args{ name: []byte("metric"), - v: prompb.TimeSeries{Labels: []prompb.Label{ + v: &prompb.TimeSeries{Labels: []*prompb.Label{ {Name: []byte("labelname"), Value: []byte("value")}, }}, }, @@ -665,7 +665,7 @@ func TestRequestTransformer_ShouldDropMetric(t *testing.T) { }, args: args{ name: []byte("metric"), - v: prompb.TimeSeries{}, + v: &prompb.TimeSeries{}, }, want: true, }, @@ -677,7 +677,7 @@ func TestRequestTransformer_ShouldDropMetric(t *testing.T) { }, args: args{ name: []byte("metric"), - v: prompb.TimeSeries{}, + v: &prompb.TimeSeries{}, }, want: false, }, diff --git a/pkg/prompb/iterator.go b/pkg/prompb/iterator.go index 3e160335..9f7809d6 100644 --- a/pkg/prompb/iterator.go +++ b/pkg/prompb/iterator.go @@ -6,19 +6,23 @@ import ( "io" "strconv" "strings" - "unicode" + + "github.com/valyala/fastjson/fastfloat" ) type Iterator struct { r io.ReadCloser scanner *bufio.Scanner current string + + buf []byte } func NewIterator(r io.ReadCloser) *Iterator { return &Iterator{ r: r, scanner: bufio.NewScanner(r), + buf: make([]byte, 0, 1024), } } @@ -34,6 +38,7 @@ func (i *Iterator) Next() bool { } return true } + return false } @@ -41,11 +46,22 @@ func (i *Iterator) Value() string { return i.current } -func (i *Iterator) TimeSeries() (TimeSeries, error) { +func (i *Iterator) TimeSeries() (*TimeSeries, error) { + if len(i.current) == 0 { + return nil, fmt.Errorf("no current value") + } + return i.ParseTimeSeries(i.current) +} + +func (i *Iterator) TimeSeriesInto(ts *TimeSeries) (*TimeSeries, error) { if len(i.current) == 0 { - return TimeSeries{}, fmt.Errorf("no current value") + return nil, fmt.Errorf("no current value") } - return ParseTimeSeries(i.current) + ts, err := i.ParseTimeSeriesInto(ts, i.current) + if err != nil { + return nil, fmt.Errorf("error parsing time series: %w: %s", err, i.current) + } + return ts, err } func (i *Iterator) Close() error { @@ -64,7 +80,7 @@ func (i *Iterator) Reset(r io.ReadCloser) { func (i *Iterator) isComment(s string) bool { for j := 0; j < len(s); j++ { - if unicode.IsSpace(rune(s[j])) { + if isSpace(s[j]) { continue } @@ -79,57 +95,87 @@ func (i *Iterator) isComment(s string) bool { func (i *Iterator) isSpace(s string) bool { for j := 0; j < len(s); j++ { - if !unicode.IsSpace(rune(s[j])) { + if !isSpace(s[j]) { return false } } return true } -func ParseTimeSeries(line string) (TimeSeries, error) { +func (i *Iterator) ParseTimeSeriesInto(ts *TimeSeries, line string) (*TimeSeries, error) { var ( name string err error ) + name, line = parseName(line) + + ts.AppendLabelString(lableName, name) + + ts, line, err = i.parseLabels(ts, line) + if err != nil { + return nil, err + } + + v, line := parseValue(line) + + t, line := parseTimestamp(line) + + value, err := fastfloat.Parse(v) + if err != nil { + return nil, fmt.Errorf("invalid value: %v", err) + } + + var timestamp int64 + if len(t) > 0 { + timestamp = fastfloat.ParseInt64BestEffort(t) + } + + ts.AppendSample(timestamp, value) + + return ts, nil +} + +func (i *Iterator) ParseTimeSeries(line string) (*TimeSeries, error) { + var ( + name string + err error + ) + ts := &TimeSeries{} + name, line = parseName(line) n := strings.Count(line, ",") - labels := make([]Label, 0, n+1) - labels = append(labels, Label{ + labels := make([]*Label, 0, n+1) + labels = append(labels, &Label{ Name: []byte("__name__"), Value: []byte(name), }) + ts.Labels = labels - labels, line, err = parseLabels(labels, line) + ts, line, err = i.parseLabels(ts, line) if err != nil { - return TimeSeries{}, err + return nil, err } v, line := parseValue(line) - ts, line := parseTimestamp(line) + tsStr, line := parseTimestamp(line) - value, err := strconv.ParseFloat(string(v), 64) + value, err := strconv.ParseFloat(v, 64) if err != nil { - return TimeSeries{}, fmt.Errorf("invalid value: %v", err) + return nil, fmt.Errorf("invalid value: %v", err) } var timestamp int64 - if len(ts) > 0 { - timestamp, err = strconv.ParseInt(string(ts), 10, 64) + if len(tsStr) > 0 { + timestamp, err = strconv.ParseInt(tsStr, 10, 64) if err != nil { - return TimeSeries{}, fmt.Errorf("invalid timestamp: %v", err) + return nil, fmt.Errorf("invalid timestamp: %v", err) } } - return TimeSeries{ - Labels: labels, - Samples: []Sample{ - { - Value: value, - Timestamp: timestamp, - }, - }, - }, nil + ts.AppendSample(timestamp, value) + + return ts, nil } func parseTimestamp(line string) (string, string) { @@ -152,21 +198,27 @@ func parseValue(line string) (string, string) { return line, "" } -func parseLabels(labels Labels, line string) (Labels, string, error) { +func (i *Iterator) parseLabels(ts *TimeSeries, line string) (*TimeSeries, string, error) { orig := line line = trimSpacePrefix(line) if len(line) == 0 { - return labels, "", nil + return nil, "", nil } if line[0] == '{' { line = line[1:] for len(line) > 0 { if line[0] == '}' { - return labels, line[1:], nil + return ts, line[1:], nil } - idx := strings.Index(line, "=") + var idx = -1 + for i := 0; i < len(line); i++ { + if line[i] == '=' { + idx = i + break + } + } if idx == -1 { return nil, "", fmt.Errorf("invalid label: no =: %s", orig) } @@ -177,7 +229,6 @@ func parseLabels(labels Labels, line string) (Labels, string, error) { } line = line[idx+1:] - if len(line) == 0 { return nil, "", fmt.Errorf("invalid label: no opening \": %s", orig) } @@ -185,7 +236,7 @@ func parseLabels(labels Labels, line string) (Labels, string, error) { line = line[1:] } - value := make([]byte, 0, 64) + value := i.buf[:0] var j int for j < len(line) { if line[j] == '\\' { @@ -211,17 +262,14 @@ func parseLabels(labels Labels, line string) (Labels, string, error) { j += 1 } - labels = append(labels, Label{ - Name: []byte(key), - Value: value, - }) + ts.AppendLabel([]byte(key), value) if len(line) == 0 { return nil, "", fmt.Errorf("invalid labels: no closing }: %s", orig) } if line[0] == '}' { - return labels, line[1:], nil + return ts, line[1:], nil } if line[0] == ',' { @@ -229,13 +277,13 @@ func parseLabels(labels Labels, line string) (Labels, string, error) { } } } - return labels, line, nil + return ts, line, nil } func parseName(line string) (string, string) { line = trimSpacePrefix(line) for i := 0; i < len(line); i++ { - if line[i] == '{' || unicode.IsSpace(rune(line[i])) { + if line[i] == '{' || isSpace(line[i]) { return line[:i], line[i:] } } @@ -244,7 +292,7 @@ func parseName(line string) (string, string) { func trimSpacePrefix(s string) string { for i := 0; i < len(s); i++ { - if unicode.IsSpace(rune(s[i])) { + if s[i] == ' ' || s[i] == '\t' { continue } return s[i:] @@ -253,5 +301,9 @@ func trimSpacePrefix(s string) string { } func isSpace(c byte) bool { - return unicode.IsSpace(rune(c)) + return c == ' ' || c == '\t' } + +const lableName = "__name__" + +var nameBytes = []byte(lableName) diff --git a/pkg/prompb/iterator_test.go b/pkg/prompb/iterator_test.go index e5371f0d..bf1358d6 100644 --- a/pkg/prompb/iterator_test.go +++ b/pkg/prompb/iterator_test.go @@ -136,278 +136,278 @@ func TestIterator_TimeSeries_Malformed(t *testing.T) { func TestIterator_TimeSeries(t *testing.T) { for _, c := range []struct { input string - want TimeSeries + want *TimeSeries }{ { input: `http_requests_total{method="post",code="200"} 1027 1395066363000`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("http_requests_total")}, {Name: []byte("method"), Value: []byte("post")}, {Name: []byte("code"), Value: []byte("200")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 1027, Timestamp: 1395066363000}, }, }, }, { input: `http_requests_total{method="post",code="400"} 3 1395066363000`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("http_requests_total")}, {Name: []byte("method"), Value: []byte("post")}, {Name: []byte("code"), Value: []byte("400")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 3, Timestamp: 1395066363000}, }, }, }, { input: `msdos_file_access_time_seconds{path="C:\\DIR\\FILE.TXT",error="Cannot find file:\n\"FILE.TXT\""} 1.458255915e9`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("msdos_file_access_time_seconds")}, {Name: []byte("path"), Value: []byte("C:\\DIR\\FILE.TXT")}, {Name: []byte("error"), Value: []byte("Cannot find file:\n\"FILE.TXT\"")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 1.458255915e9, Timestamp: 0}, }, }, }, { input: `metric_without_timestamp_and_labels 12.47`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("metric_without_timestamp_and_labels")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 12.47, Timestamp: 0}, }, }, }, { input: `something_weird{problem="division by zero"} +Inf -3982045`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("something_weird")}, {Name: []byte("problem"), Value: []byte("division by zero")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: math.Inf(1), Timestamp: -3982045}, }, }, }, { input: `http_request_duration_seconds_bucket{le="0.05"} 24054`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("http_request_duration_seconds_bucket")}, {Name: []byte("le"), Value: []byte("0.05")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 24054, Timestamp: 0}, }, }, }, { input: `http_request_duration_seconds_bucket{le="0.1"} 33444`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("http_request_duration_seconds_bucket")}, {Name: []byte("le"), Value: []byte("0.1")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 33444, Timestamp: 0}, }, }, }, { input: `http_request_duration_seconds_bucket{le="0.2"} 100392`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("http_request_duration_seconds_bucket")}, {Name: []byte("le"), Value: []byte("0.2")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 100392, Timestamp: 0}, }, }, }, { input: `http_request_duration_seconds_bucket{le="0.5"} 129389`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("http_request_duration_seconds_bucket")}, {Name: []byte("le"), Value: []byte("0.5")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 129389, Timestamp: 0}, }, }, }, { input: `http_request_duration_seconds_bucket{le="1"} 133988`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("http_request_duration_seconds_bucket")}, {Name: []byte("le"), Value: []byte("1")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 133988, Timestamp: 0}, }, }, }, { input: `http_request_duration_seconds_bucket{le="+Inf"} 144320`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("http_request_duration_seconds_bucket")}, {Name: []byte("le"), Value: []byte("+Inf")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 144320, Timestamp: 0}, }, }, }, { input: `http_request_duration_seconds_sum 53423`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("http_request_duration_seconds_sum")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 53423, Timestamp: 0}, }, }, }, { input: `http_request_duration_seconds_count 144320`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("http_request_duration_seconds_count")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 144320, Timestamp: 0}, }, }, }, { input: `rpc_duration_seconds{quantile="0.01"} 3102`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("rpc_duration_seconds")}, {Name: []byte("quantile"), Value: []byte("0.01")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 3102, Timestamp: 0}, }, }, }, { input: `rpc_duration_seconds{quantile="0.05"} 3272`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("rpc_duration_seconds")}, {Name: []byte("quantile"), Value: []byte("0.05")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 3272, Timestamp: 0}, }, }, }, { input: `rpc_duration_seconds{quantile="0.5"} 4773`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("rpc_duration_seconds")}, {Name: []byte("quantile"), Value: []byte("0.5")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 4773, Timestamp: 0}, }, }, }, { input: `rpc_duration_seconds{quantile="0.9"} 9001`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("rpc_duration_seconds")}, {Name: []byte("quantile"), Value: []byte("0.9")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 9001, Timestamp: 0}, }, }, }, { input: `rpc_duration_seconds{quantile="0.99"} 76656`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("rpc_duration_seconds")}, {Name: []byte("quantile"), Value: []byte("0.99")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 76656, Timestamp: 0}, }, }, }, { input: `rpc_duration_seconds_sum 1.7560473e+07`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("rpc_duration_seconds_sum")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 1.7560473e+07, Timestamp: 0}, }, }, }, { input: `rpc_duration_seconds_count 2693`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("rpc_duration_seconds_count")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 2693, Timestamp: 0}, }, }, }, { input: `rpc_duration_seconds_count{} 2693`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("rpc_duration_seconds_count")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 2693, Timestamp: 0}, }, }, }, { input: `container_cpu_usage {container="liveness-probe",id="/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod14e7c01a_0774_45f2_bb6a_b10e8e7f439e.slice/cri-containerd-51fa77b79a00e161e839762b7e4cbb6abfbaec68d3d7f56bd9665b9d48816894.scope",image="mcr.microsoft.com/oss/kubernetes-csi/livenessprobe:v2."} 0 1726114705191`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("container_cpu_usage")}, {Name: []byte("container"), Value: []byte("liveness-probe")}, {Name: []byte("id"), Value: []byte("/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod14e7c01a_0774_45f2_bb6a_b10e8e7f439e.slice/cri-containerd-51fa77b79a00e161e839762b7e4cbb6abfbaec68d3d7f56bd9665b9d48816894.scope")}, {Name: []byte("image"), Value: []byte("mcr.microsoft.com/oss/kubernetes-csi/livenessprobe:v2.")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 0, Timestamp: 1726114705191}, }, }, }, { input: `node_cpu_usage_seconds_total 157399.56 1726236687470`, - want: TimeSeries{ - Labels: []Label{ + want: &TimeSeries{ + Labels: []*Label{ {Name: []byte("__name__"), Value: []byte("node_cpu_usage_seconds_total")}, }, - Samples: []Sample{ + Samples: []*Sample{ {Value: 157399.56, Timestamp: 1726236687470}, }, }, @@ -429,12 +429,12 @@ func BenchmarkIterator_TimeSeries(b *testing.B) { sampleInput := `http_requests_total{method="post",code="200"} 1027 1395066363000` iter := NewIterator(io.NopCloser(strings.NewReader(sampleInput))) require.True(b, iter.Next()) + ts := &TimeSeries{} for i := 0; i < b.N; i++ { - - _, err := iter.TimeSeries() + ts.Reset() + _, err := iter.TimeSeriesInto(ts) if err != nil { b.Fatalf("unexpected error: %v", err) } - iter.Close() } } diff --git a/pkg/prompb/protobuf.go b/pkg/prompb/protobuf.go index f7960c6f..00aab95d 100644 --- a/pkg/prompb/protobuf.go +++ b/pkg/prompb/protobuf.go @@ -3,23 +3,29 @@ package prompb import ( "fmt" mathbits "math/bits" + "sync" "github.com/VictoriaMetrics/easyproto" ) var ( - mp = &easyproto.MarshalerPool{} + mp = &easyproto.MarshalerPool{} + TimeSeriesPool = sync.Pool{ + New: func() interface{} { + return &TimeSeries{} + }, + } ) // WriteRequest represents Prometheus remote write API request type WriteRequest struct { - Timeseries []TimeSeries + Timeseries []*TimeSeries } // TimeSeries is a timeseries. type TimeSeries struct { - Labels []Label - Samples []Sample + Labels []*Label + Samples []*Sample } // Label is a timeseries label @@ -51,10 +57,13 @@ func (wr *WriteRequest) Unmarshal(src []byte) (err error) { } if cap(wr.Timeseries) > len(wr.Timeseries) { wr.Timeseries = wr.Timeseries[:len(wr.Timeseries)+1] + if wr.Timeseries[len(wr.Timeseries)-1] == nil { + wr.Timeseries[len(wr.Timeseries)-1] = &TimeSeries{} + } } else { - wr.Timeseries = append(wr.Timeseries, TimeSeries{}) + wr.Timeseries = append(wr.Timeseries, &TimeSeries{}) } - ts := &wr.Timeseries[len(wr.Timeseries)-1] + ts := wr.Timeseries[len(wr.Timeseries)-1] if err := ts.unmarshalProtobuf(data); err != nil { return fmt.Errorf("cannot unmarshal sample: %w", err) } @@ -83,8 +92,9 @@ func (wr *WriteRequest) MarshalTo(dst []byte) ([]byte, error) { // Reset resets wr. func (wr *WriteRequest) Reset() { for i := range wr.Timeseries { - ts := &wr.Timeseries[i] + ts := wr.Timeseries[i] ts.Reset() + wr.Timeseries[i] = ts } wr.Timeseries = wr.Timeseries[:0] } @@ -208,10 +218,13 @@ func (m *TimeSeries) unmarshalProtobuf(src []byte) (err error) { } if cap(m.Labels) > len(m.Labels) { m.Labels = m.Labels[:len(m.Labels)+1] + if m.Labels[len(m.Labels)-1] == nil { + m.Labels[len(m.Labels)-1] = &Label{} + } } else { - m.Labels = append(m.Labels, Label{}) + m.Labels = append(m.Labels, &Label{}) } - s := &m.Labels[len(m.Labels)-1] + s := m.Labels[len(m.Labels)-1] if err := s.unmarshalProtobuf(data); err != nil { return fmt.Errorf("cannot unmarshal sample: %w", err) } @@ -222,10 +235,13 @@ func (m *TimeSeries) unmarshalProtobuf(src []byte) (err error) { } if cap(m.Samples) > len(m.Samples) { m.Samples = m.Samples[:len(m.Samples)+1] + if m.Samples[len(m.Samples)-1] == nil { + m.Samples[len(m.Samples)-1] = &Sample{} + } } else { - m.Samples = append(m.Samples, Sample{}) + m.Samples = append(m.Samples, &Sample{}) } - s := &m.Samples[len(m.Samples)-1] + s := m.Samples[len(m.Samples)-1] if err := s.unmarshalProtobuf(data); err != nil { return fmt.Errorf("cannot unmarshal sample: %w", err) } @@ -236,17 +252,64 @@ func (m *TimeSeries) unmarshalProtobuf(src []byte) (err error) { func (ts *TimeSeries) Reset() { for i := range ts.Labels { - l := &ts.Labels[i] + l := ts.Labels[i] l.Reset() + ts.Labels[i] = l } for i := range ts.Samples { - s := &ts.Samples[i] + s := ts.Samples[i] s.Reset() + ts.Samples[i] = s } ts.Labels = ts.Labels[:0] ts.Samples = ts.Samples[:0] } +func (m *TimeSeries) AppendLabel(key []byte, value []byte) { + if cap(m.Labels) > len(m.Labels) { + m.Labels = m.Labels[:len(m.Labels)+1] + if m.Labels[len(m.Labels)-1] == nil { + m.Labels[len(m.Labels)-1] = &Label{} + } + } else { + m.Labels = append(m.Labels, &Label{}) + } + l := m.Labels[len(m.Labels)-1] + l.Reset() + l.Name = append(l.Name[:0], key...) + l.Value = append(l.Value[:0], value...) +} + +func (m *TimeSeries) AppendLabelString(key string, value string) { + if cap(m.Labels) > len(m.Labels) { + m.Labels = m.Labels[:len(m.Labels)+1] + if m.Labels[len(m.Labels)-1] == nil { + m.Labels[len(m.Labels)-1] = &Label{} + } + } else { + m.Labels = append(m.Labels, &Label{}) + } + l := m.Labels[len(m.Labels)-1] + l.Reset() + l.Name = append(l.Name[:0], key...) + l.Value = append(l.Value[:0], value...) +} + +func (m *TimeSeries) AppendSample(timestamp int64, value float64) { + if cap(m.Samples) > len(m.Samples) { + m.Samples = m.Samples[:len(m.Samples)+1] + if m.Samples[len(m.Samples)-1] == nil { + m.Samples[len(m.Samples)-1] = &Sample{} + } + } else { + m.Samples = append(m.Samples, &Sample{}) + } + s := m.Samples[len(m.Samples)-1] + s.Reset() + s.Timestamp = timestamp + s.Value = value +} + func (m *Label) Size() (n int) { if m == nil { return 0 diff --git a/pkg/prompb/protobuf_test.go b/pkg/prompb/protobuf_test.go index 1c113245..2a2d49e2 100644 --- a/pkg/prompb/protobuf_test.go +++ b/pkg/prompb/protobuf_test.go @@ -9,14 +9,14 @@ import ( func TestMarshal(t *testing.T) { wr := WriteRequest{ - Timeseries: []TimeSeries{ + Timeseries: []*TimeSeries{ { - Labels: []Label{ + Labels: []*Label{ { Name: []byte("__name__"), Value: []byte("cpu")}, }, - Samples: []Sample{ + Samples: []*Sample{ { Timestamp: int64(1), Value: 1.0, @@ -34,14 +34,14 @@ func TestMarshal(t *testing.T) { func TestMarshalTo(t *testing.T) { wr := WriteRequest{ - Timeseries: []TimeSeries{ + Timeseries: []*TimeSeries{ { - Labels: []Label{ + Labels: []*Label{ { Name: []byte("__name__"), Value: []byte("cpu")}, }, - Samples: []Sample{ + Samples: []*Sample{ { Timestamp: int64(1), Value: 1.0, diff --git a/pkg/prompb/sort.go b/pkg/prompb/sort.go index d04f4285..a0ac4cc7 100644 --- a/pkg/prompb/sort.go +++ b/pkg/prompb/sort.go @@ -20,7 +20,7 @@ func (l Labels) Swap(i, j int) { } // IsSorted return true if the labels are sorted according to Sort. -func IsSorted(l []Label) bool { +func IsSorted(l []*Label) bool { if len(l) == 1 { return true } @@ -33,7 +33,7 @@ func IsSorted(l []Label) bool { } // Sort sorts labels ensuring the __name__ is first the remaining labels or ordered by name. -func Sort(l []Label) { +func Sort(l []*Label) { sort.Slice(l, func(i, j int) bool { return labelLess(l[i].Name, l[j].Name) }) diff --git a/pkg/prompb/sort_test.go b/pkg/prompb/sort_test.go index 429a7081..ae3569a3 100644 --- a/pkg/prompb/sort_test.go +++ b/pkg/prompb/sort_test.go @@ -7,7 +7,7 @@ import ( ) func TestSortedLabels(t *testing.T) { - l := []Label{ + l := []*Label{ { Name: []byte("foo"), Value: []byte("bar"), @@ -59,7 +59,7 @@ func TestCompareLower(t *testing.T) { } func BenchmarkIsSorted(b *testing.B) { - l := []Label{ + l := []*Label{ { Name: []byte("foo"), Value: []byte("bar"), diff --git a/pkg/prompb/util.go b/pkg/prompb/util.go index 94bbb54a..201610f7 100644 --- a/pkg/prompb/util.go +++ b/pkg/prompb/util.go @@ -1,6 +1,6 @@ package prompb -func MetricName(ts TimeSeries) []byte { +func MetricName(ts *TimeSeries) []byte { for _, l := range ts.Labels { if string(l.Name) == "__name__" { return l.Value diff --git a/pkg/promremote/proxy.go b/pkg/promremote/proxy.go index 2682e8de..e78d3f90 100644 --- a/pkg/promremote/proxy.go +++ b/pkg/promremote/proxy.go @@ -20,9 +20,9 @@ type RemoteWriteProxy struct { disableMetricsForwarding bool // queue is the channel where incoming writes are queued. These are arbitrary sized writes. - queue chan prompb.WriteRequest + queue chan *prompb.WriteRequest // ready is the channel where writes are batched up to maxBatchSize and ready to be sent to the remote write endpoint. - ready chan prompb.WriteRequest + ready chan *prompb.WriteRequest cancelFn context.CancelFunc } @@ -33,8 +33,8 @@ func NewRemoteWriteProxy(client *Client, endpoints []string, maxBatchSize int, d endpoints: endpoints, maxBatchSize: maxBatchSize, disableMetricsForwarding: disableMetricsForwarding, - queue: make(chan prompb.WriteRequest, 100), - ready: make(chan prompb.WriteRequest, 5), + queue: make(chan *prompb.WriteRequest, 100), + ready: make(chan *prompb.WriteRequest, 5), } return p } @@ -54,7 +54,7 @@ func (r *RemoteWriteProxy) Close() error { return nil } -func (r *RemoteWriteProxy) Write(ctx context.Context, wr prompb.WriteRequest) error { +func (r *RemoteWriteProxy) Write(ctx context.Context, wr *prompb.WriteRequest) error { if logger.IsDebug() { var sb strings.Builder for _, ts := range wr.Timeseries { @@ -90,7 +90,7 @@ func (r *RemoteWriteProxy) Write(ctx context.Context, wr prompb.WriteRequest) er } func (c *RemoteWriteProxy) flush(ctx context.Context) { - var pendingBatch prompb.WriteRequest + pendingBatch := &prompb.WriteRequest{} for { select { @@ -101,14 +101,14 @@ func (c *RemoteWriteProxy) flush(ctx context.Context) { // Flush as many full queue as we can for len(pendingBatch.Timeseries) >= c.maxBatchSize { - var nextBatch prompb.WriteRequest + nextBatch := &prompb.WriteRequest{} nextBatch.Timeseries = append(nextBatch.Timeseries, pendingBatch.Timeseries[:c.maxBatchSize]...) pendingBatch.Timeseries = append(pendingBatch.Timeseries[:0], pendingBatch.Timeseries[c.maxBatchSize:]...) c.ready <- nextBatch } case <-time.After(10 * time.Second): for len(pendingBatch.Timeseries) >= c.maxBatchSize { - var nextBatch prompb.WriteRequest + nextBatch := &prompb.WriteRequest{} nextBatch.Timeseries = append(nextBatch.Timeseries, pendingBatch.Timeseries[:c.maxBatchSize]...) pendingBatch.Timeseries = append(pendingBatch.Timeseries[:0], pendingBatch.Timeseries[c.maxBatchSize:]...) c.ready <- nextBatch @@ -116,7 +116,7 @@ func (c *RemoteWriteProxy) flush(ctx context.Context) { if len(pendingBatch.Timeseries) == 0 { continue } - var nextBatch prompb.WriteRequest + nextBatch := &prompb.WriteRequest{} nextBatch.Timeseries = append(nextBatch.Timeseries, pendingBatch.Timeseries...) c.ready <- nextBatch @@ -160,7 +160,7 @@ func (p *RemoteWriteProxy) sendBatch(ctx context.Context) error { for _, endpoint := range p.endpoints { endpoint := endpoint g.Go(func() error { - return p.client.Write(gCtx, endpoint, &wr) + return p.client.Write(gCtx, endpoint, wr) }) } logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(p.endpoints), time.Since(start)) diff --git a/tools/data/metric.go b/tools/data/metric.go index 91743143..90cb06b1 100644 --- a/tools/data/metric.go +++ b/tools/data/metric.go @@ -16,20 +16,20 @@ type SetOptions struct { } type Metric struct { - Labels []prompb.Label + Labels []*prompb.Label value float64 cardinality int n int } -func (m *Metric) Next(timestamp time.Time) prompb.TimeSeries { +func (m *Metric) Next(timestamp time.Time) *prompb.TimeSeries { // Randomize the cardinality index so that we get more distinct sets of active series sent concurrently. if m.n == 0 { m.n = rand.Intn(m.cardinality) } - ts := prompb.TimeSeries{ + ts := &prompb.TimeSeries{ Labels: m.Labels, - Samples: []prompb.Sample{{ + Samples: []*prompb.Sample{{ Value: m.value, Timestamp: timestamp.UnixNano() / 1e6, }}, @@ -46,14 +46,14 @@ type Set struct { opts SetOptions } -func (s *Set) Next(timestamp time.Time) prompb.TimeSeries { +func (s *Set) Next(timestamp time.Time) *prompb.TimeSeries { metric := s.metrics[s.i%len(s.metrics)] ts := metric.Next(timestamp) - ts.Labels = append(ts.Labels, prompb.Label{ + ts.Labels = append(ts.Labels, &prompb.Label{ Name: []byte("adxmon_database"), Value: []byte(s.opts.Database), }) - ts.Labels = append(ts.Labels, prompb.Label{ + ts.Labels = append(ts.Labels, &prompb.Label{ Name: []byte("host"), Value: []byte(fmt.Sprintf("host_%d", metric.n%metric.cardinality)), }) @@ -68,116 +68,116 @@ func NewDataSet(opts SetOptions) *Set { s := &Set{ opts: opts, metrics: []*Metric{ - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("cpu")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("mem")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("mem_total")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("mem_free")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("mem_cached")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("mem_buffers")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("disk")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("disk_total")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("disk_used")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("disk_free")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("net")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("netio")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("diskio")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("cpu_util")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("cpu_load1")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("cpu_load5")}, {[]byte("region"), []byte("eastus")}, }, cardinality: opts.Cardinality, }, - {Labels: []prompb.Label{ + {Labels: []*prompb.Label{ {[]byte("__name__"), []byte("cpu_load15")}, {[]byte("region"), []byte("eastus")}, }, @@ -188,7 +188,7 @@ func NewDataSet(opts SetOptions) *Set { for i := 0; i < 1000; i++ { s.metrics = append(s.metrics, - &Metric{Labels: []prompb.Label{ + &Metric{Labels: []*prompb.Label{ {[]byte("__name__"), []byte(fmt.Sprintf("metric%d", i))}, {[]byte("region"), []byte("eastus")}, }, diff --git a/vendor/github.com/valyala/fastjson/LICENSE b/vendor/github.com/valyala/fastjson/LICENSE new file mode 100644 index 00000000..6f665f3e --- /dev/null +++ b/vendor/github.com/valyala/fastjson/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2018 Aliaksandr Valialkin + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/valyala/fastjson/fastfloat/parse.go b/vendor/github.com/valyala/fastjson/fastfloat/parse.go new file mode 100644 index 00000000..b37838da --- /dev/null +++ b/vendor/github.com/valyala/fastjson/fastfloat/parse.go @@ -0,0 +1,515 @@ +package fastfloat + +import ( + "fmt" + "math" + "strconv" + "strings" +) + +// ParseUint64BestEffort parses uint64 number s. +// +// It is equivalent to strconv.ParseUint(s, 10, 64), but is faster. +// +// 0 is returned if the number cannot be parsed. +// See also ParseUint64, which returns parse error if the number cannot be parsed. +func ParseUint64BestEffort(s string) uint64 { + if len(s) == 0 { + return 0 + } + i := uint(0) + d := uint64(0) + j := i + for i < uint(len(s)) { + if s[i] >= '0' && s[i] <= '9' { + d = d*10 + uint64(s[i]-'0') + i++ + if i > 18 { + // The integer part may be out of range for uint64. + // Fall back to slow parsing. + dd, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return 0 + } + return dd + } + continue + } + break + } + if i <= j { + return 0 + } + if i < uint(len(s)) { + // Unparsed tail left. + return 0 + } + return d +} + +// ParseUint64 parses uint64 from s. +// +// It is equivalent to strconv.ParseUint(s, 10, 64), but is faster. +// +// See also ParseUint64BestEffort. +func ParseUint64(s string) (uint64, error) { + if len(s) == 0 { + return 0, fmt.Errorf("cannot parse uint64 from empty string") + } + i := uint(0) + d := uint64(0) + j := i + for i < uint(len(s)) { + if s[i] >= '0' && s[i] <= '9' { + d = d*10 + uint64(s[i]-'0') + i++ + if i > 18 { + // The integer part may be out of range for uint64. + // Fall back to slow parsing. + dd, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return 0, err + } + return dd, nil + } + continue + } + break + } + if i <= j { + return 0, fmt.Errorf("cannot parse uint64 from %q", s) + } + if i < uint(len(s)) { + // Unparsed tail left. + return 0, fmt.Errorf("unparsed tail left after parsing uint64 from %q: %q", s, s[i:]) + } + return d, nil +} + +// ParseInt64BestEffort parses int64 number s. +// +// It is equivalent to strconv.ParseInt(s, 10, 64), but is faster. +// +// 0 is returned if the number cannot be parsed. +// See also ParseInt64, which returns parse error if the number cannot be parsed. +func ParseInt64BestEffort(s string) int64 { + if len(s) == 0 { + return 0 + } + i := uint(0) + minus := s[0] == '-' + if minus { + i++ + if i >= uint(len(s)) { + return 0 + } + } + + d := int64(0) + j := i + for i < uint(len(s)) { + if s[i] >= '0' && s[i] <= '9' { + d = d*10 + int64(s[i]-'0') + i++ + if i > 18 { + // The integer part may be out of range for int64. + // Fall back to slow parsing. + dd, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0 + } + return dd + } + continue + } + break + } + if i <= j { + return 0 + } + if i < uint(len(s)) { + // Unparsed tail left. + return 0 + } + if minus { + d = -d + } + return d +} + +// ParseInt64 parses int64 number s. +// +// It is equivalent to strconv.ParseInt(s, 10, 64), but is faster. +// +// See also ParseInt64BestEffort. +func ParseInt64(s string) (int64, error) { + if len(s) == 0 { + return 0, fmt.Errorf("cannot parse int64 from empty string") + } + i := uint(0) + minus := s[0] == '-' + if minus { + i++ + if i >= uint(len(s)) { + return 0, fmt.Errorf("cannot parse int64 from %q", s) + } + } + + d := int64(0) + j := i + for i < uint(len(s)) { + if s[i] >= '0' && s[i] <= '9' { + d = d*10 + int64(s[i]-'0') + i++ + if i > 18 { + // The integer part may be out of range for int64. + // Fall back to slow parsing. + dd, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0, err + } + return dd, nil + } + continue + } + break + } + if i <= j { + return 0, fmt.Errorf("cannot parse int64 from %q", s) + } + if i < uint(len(s)) { + // Unparsed tail left. + return 0, fmt.Errorf("unparsed tail left after parsing int64 form %q: %q", s, s[i:]) + } + if minus { + d = -d + } + return d, nil +} + +// Exact powers of 10. +// +// This works faster than math.Pow10, since it avoids additional multiplication. +var float64pow10 = [...]float64{ + 1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12, 1e13, 1e14, 1e15, 1e16, +} + +// ParseBestEffort parses floating-point number s. +// +// It is equivalent to strconv.ParseFloat(s, 64), but is faster. +// +// 0 is returned if the number cannot be parsed. +// See also Parse, which returns parse error if the number cannot be parsed. +func ParseBestEffort(s string) float64 { + if len(s) == 0 { + return 0 + } + i := uint(0) + minus := s[0] == '-' + if minus { + i++ + if i >= uint(len(s)) { + return 0 + } + } + + // the integer part might be elided to remain compliant + // with https://go.dev/ref/spec#Floating-point_literals + if s[i] == '.' && (i+1 >= uint(len(s)) || s[i+1] < '0' || s[i+1] > '9') { + return 0 + } + + d := uint64(0) + j := i + for i < uint(len(s)) { + if s[i] >= '0' && s[i] <= '9' { + d = d*10 + uint64(s[i]-'0') + i++ + if i > 18 { + // The integer part may be out of range for uint64. + // Fall back to slow parsing. + f, err := strconv.ParseFloat(s, 64) + if err != nil && !math.IsInf(f, 0) { + return 0 + } + return f + } + continue + } + break + } + if i <= j && s[i] != '.' { + s = s[i:] + if strings.HasPrefix(s, "+") { + s = s[1:] + } + // "infinity" is needed for OpenMetrics support. + // See https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md + if strings.EqualFold(s, "inf") || strings.EqualFold(s, "infinity") { + if minus { + return -inf + } + return inf + } + if strings.EqualFold(s, "nan") { + return nan + } + return 0 + } + f := float64(d) + if i >= uint(len(s)) { + // Fast path - just integer. + if minus { + f = -f + } + return f + } + + if s[i] == '.' { + // Parse fractional part. + i++ + if i >= uint(len(s)) { + // the fractional part may be elided to remain compliant + // with https://go.dev/ref/spec#Floating-point_literals + return f + } + k := i + for i < uint(len(s)) { + if s[i] >= '0' && s[i] <= '9' { + d = d*10 + uint64(s[i]-'0') + i++ + if i-j >= uint(len(float64pow10)) { + // The mantissa is out of range. Fall back to standard parsing. + f, err := strconv.ParseFloat(s, 64) + if err != nil && !math.IsInf(f, 0) { + return 0 + } + return f + } + continue + } + break + } + if i < k { + return 0 + } + // Convert the entire mantissa to a float at once to avoid rounding errors. + f = float64(d) / float64pow10[i-k] + if i >= uint(len(s)) { + // Fast path - parsed fractional number. + if minus { + f = -f + } + return f + } + } + if s[i] == 'e' || s[i] == 'E' { + // Parse exponent part. + i++ + if i >= uint(len(s)) { + return 0 + } + expMinus := false + if s[i] == '+' || s[i] == '-' { + expMinus = s[i] == '-' + i++ + if i >= uint(len(s)) { + return 0 + } + } + exp := int16(0) + j := i + for i < uint(len(s)) { + if s[i] >= '0' && s[i] <= '9' { + exp = exp*10 + int16(s[i]-'0') + i++ + if exp > 300 { + // The exponent may be too big for float64. + // Fall back to standard parsing. + f, err := strconv.ParseFloat(s, 64) + if err != nil && !math.IsInf(f, 0) { + return 0 + } + return f + } + continue + } + break + } + if i <= j { + return 0 + } + if expMinus { + exp = -exp + } + f *= math.Pow10(int(exp)) + if i >= uint(len(s)) { + if minus { + f = -f + } + return f + } + } + return 0 +} + +// Parse parses floating-point number s. +// +// It is equivalent to strconv.ParseFloat(s, 64), but is faster. +// +// See also ParseBestEffort. +func Parse(s string) (float64, error) { + if len(s) == 0 { + return 0, fmt.Errorf("cannot parse float64 from empty string") + } + i := uint(0) + minus := s[0] == '-' + if minus { + i++ + if i >= uint(len(s)) { + return 0, fmt.Errorf("cannot parse float64 from %q", s) + } + } + + // the integer part might be elided to remain compliant + // with https://go.dev/ref/spec#Floating-point_literals + if s[i] == '.' && (i+1 >= uint(len(s)) || s[i+1] < '0' || s[i+1] > '9') { + return 0, fmt.Errorf("missing integer and fractional part in %q", s) + } + + d := uint64(0) + j := i + for i < uint(len(s)) { + if s[i] >= '0' && s[i] <= '9' { + d = d*10 + uint64(s[i]-'0') + i++ + if i > 18 { + // The integer part may be out of range for uint64. + // Fall back to slow parsing. + f, err := strconv.ParseFloat(s, 64) + if err != nil && !math.IsInf(f, 0) { + return 0, err + } + return f, nil + } + continue + } + break + } + if i <= j && s[i] != '.' { + ss := s[i:] + if strings.HasPrefix(ss, "+") { + ss = ss[1:] + } + // "infinity" is needed for OpenMetrics support. + // See https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md + if strings.EqualFold(ss, "inf") || strings.EqualFold(ss, "infinity") { + if minus { + return -inf, nil + } + return inf, nil + } + if strings.EqualFold(ss, "nan") { + return nan, nil + } + return 0, fmt.Errorf("unparsed tail left after parsing float64 from %q: %q", s, ss) + } + f := float64(d) + if i >= uint(len(s)) { + // Fast path - just integer. + if minus { + f = -f + } + return f, nil + } + + if s[i] == '.' { + // Parse fractional part. + i++ + if i >= uint(len(s)) { + // the fractional part might be elided to remain compliant + // with https://go.dev/ref/spec#Floating-point_literals + return f, nil + } + k := i + for i < uint(len(s)) { + if s[i] >= '0' && s[i] <= '9' { + d = d*10 + uint64(s[i]-'0') + i++ + if i-j >= uint(len(float64pow10)) { + // The mantissa is out of range. Fall back to standard parsing. + f, err := strconv.ParseFloat(s, 64) + if err != nil && !math.IsInf(f, 0) { + return 0, fmt.Errorf("cannot parse mantissa in %q: %s", s, err) + } + return f, nil + } + continue + } + break + } + if i < k { + return 0, fmt.Errorf("cannot find mantissa in %q", s) + } + // Convert the entire mantissa to a float at once to avoid rounding errors. + f = float64(d) / float64pow10[i-k] + if i >= uint(len(s)) { + // Fast path - parsed fractional number. + if minus { + f = -f + } + return f, nil + } + } + if s[i] == 'e' || s[i] == 'E' { + // Parse exponent part. + i++ + if i >= uint(len(s)) { + return 0, fmt.Errorf("cannot parse exponent in %q", s) + } + expMinus := false + if s[i] == '+' || s[i] == '-' { + expMinus = s[i] == '-' + i++ + if i >= uint(len(s)) { + return 0, fmt.Errorf("cannot parse exponent in %q", s) + } + } + exp := int16(0) + j := i + for i < uint(len(s)) { + if s[i] >= '0' && s[i] <= '9' { + exp = exp*10 + int16(s[i]-'0') + i++ + if exp > 300 { + // The exponent may be too big for float64. + // Fall back to standard parsing. + f, err := strconv.ParseFloat(s, 64) + if err != nil && !math.IsInf(f, 0) { + return 0, fmt.Errorf("cannot parse exponent in %q: %s", s, err) + } + return f, nil + } + continue + } + break + } + if i <= j { + return 0, fmt.Errorf("cannot parse exponent in %q", s) + } + if expMinus { + exp = -exp + } + f *= math.Pow10(int(exp)) + if i >= uint(len(s)) { + if minus { + f = -f + } + return f, nil + } + } + return 0, fmt.Errorf("cannot parse float64 from %q", s) +} + +var inf = math.Inf(1) +var nan = math.NaN() diff --git a/vendor/modules.txt b/vendor/modules.txt index 63950484..e849002e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -369,6 +369,9 @@ github.com/traefik/yaegi/stdlib # github.com/urfave/cli/v2 v2.27.4 ## explicit; go 1.18 github.com/urfave/cli/v2 +# github.com/valyala/fastjson v1.6.4 +## explicit; go 1.12 +github.com/valyala/fastjson/fastfloat # github.com/x448/float16 v0.8.4 ## explicit; go 1.11 github.com/x448/float16