Skip to content

Commit

Permalink
[Querier] optimize prom query
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Jan 19, 2024
1 parent 754e8f3 commit 9abb9f8
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 98 deletions.
1 change: 1 addition & 0 deletions server/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/deepflowio/deepflow/server/querier/querier"

logging "github.com/op/go-logging"
_ "go.uber.org/automaxprocs"
)

func execName() string {
Expand Down
1 change: 1 addition & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ require (
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/pyroscope-io/pyroscope v0.37.1
go.opentelemetry.io/collector/pdata v1.0.0
go.uber.org/automaxprocs v1.5.3
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b
skywalking.apache.org/repo/goapi v0.0.0-20230712035303-201c1fb2d6ec
)
Expand Down
3 changes: 3 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
Expand Down Expand Up @@ -716,6 +717,8 @@ go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lI
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand Down
43 changes: 28 additions & 15 deletions server/querier/app/prometheus/cache/remoteread_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,16 @@ func (c *CacheItem) FixupQueryTime(start int64, end int64) (int64, int64) {
return start, end
}

func getSeriesLabels(lb *[]prompb.Label) string {
sort.Slice(*lb, func(i, j int) bool { return (*lb)[i].Name < (*lb)[j].Name })
labels := make([]string, 0, len(*lb))
for i := 0; i < len(*lb); i++ {
labels = append(labels, (*lb)[i].Name+":"+(*lb)[i].Value)
func promLabelsEqual(a *[]prompb.Label, b *[]prompb.Label) bool {
if a == nil || b == nil || len(*a) != len(*b) {
return false
}
return strings.Join(labels, ",")
for i := 0; i < len(*a); i++ {
if (*a)[i].Name != (*b)[i].Name || (*a)[i].Value != (*b)[i].Value {
return false
}
}
return true
}

func (c *CacheItem) mergeResponse(start, end int64, query *prompb.ReadResponse) *prompb.ReadResponse {
Expand Down Expand Up @@ -196,11 +199,8 @@ func (c *CacheItem) mergeResponse(start, end int64, query *prompb.ReadResponse)
}

for _, ts := range queryTs {
labels := getSeriesLabels(&ts.Labels)

for _, existsTs := range cachedTs {
cachedLabels := getSeriesLabels(&existsTs.Labels)
if labels == cachedLabels {
if promLabelsEqual(&ts.Labels, &existsTs.Labels) {
existsSamples := existsTs.Samples
existsSamplesStart := existsSamples[0].Timestamp
existsSamplesEnd := existsSamples[len(existsSamples)-1].Timestamp
Expand Down Expand Up @@ -272,13 +272,16 @@ func NewRemoteReadQueryCache() *RemoteReadQueryCache {
return s
}

func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, query *prompb.ReadResponse) *prompb.ReadResponse {
func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, resp *prompb.ReadResponse) *prompb.ReadResponse {
if req == nil || len(req.Queries) == 0 {
return query
return resp
}
if resp == nil || len(resp.Results) == 0 {
return resp
}
q := req.Queries[0]
if q.Hints.Func == "series" {
return query
return resp
}

key, _ := promRequestToCacheKey(q)
Expand All @@ -296,7 +299,7 @@ func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, query *prompb
}()

if !ok {
item = &CacheItem{startTime: start, endTime: end, data: query, rwLock: &sync.RWMutex{}}
item = &CacheItem{startTime: start, endTime: end, data: resp, rwLock: &sync.RWMutex{}}
s.cache.Add(key, item)
} else {
// cache hit, merge data
Expand All @@ -306,7 +309,7 @@ func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, query *prompb
item.rwLock.Lock()
defer item.rwLock.Unlock()

item.data = item.mergeResponse(start, end, query)
item.data = item.mergeResponse(start, end, resp)
d := time.Since(t1)
atomic.AddUint64(&s.counter.Stats.CacheMergeDuration, uint64(d.Seconds()))
}
Expand Down Expand Up @@ -338,6 +341,16 @@ func copyResponse(cached *prompb.ReadResponse) *prompb.ReadResponse {
return resp
}

func (s *RemoteReadQueryCache) Remove(req *prompb.ReadRequest) {
if req == nil || len(req.Queries) == 0 {
return
}
key, _ := promRequestToCacheKey(req.Queries[0])
s.lock.Lock()
defer s.lock.Unlock()
s.cache.Remove(key)
}

func (s *RemoteReadQueryCache) Get(req *prompb.ReadRequest) (*prompb.ReadResponse, CacheHit, string, int64, int64) {
emptyResponse := &prompb.ReadResponse{}
if req == nil || len(req.Queries) == 0 {
Expand Down
23 changes: 11 additions & 12 deletions server/querier/app/prometheus/cache/response_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package cache
import (
"fmt"
"sort"
"strings"
"sync"
"unsafe"

Expand Down Expand Up @@ -270,12 +269,10 @@ func (c *Cacher) matrixMerge(resp promql.Matrix, cache *promql.Result) (promql.R
}
output := make(promql.Matrix, 0, len(cacheMatrix))
for _, cachedTs := range cacheMatrix {
cachedSeries := genSeriesLabelString(&cachedTs.Metric)
newSeries := promql.Series{Metric: cachedTs.Metric}
newSeries.Points = cachedTs.Points
for _, series := range resp {
respSeries := genSeriesLabelString(&series.Metric)
if respSeries == cachedSeries {
if labelsEqual(&cachedTs.Metric, &series.Metric) {
existsStartT := newSeries.Points[0].T
existsEndT := newSeries.Points[len(newSeries.Points)-1].T

Expand Down Expand Up @@ -315,12 +312,10 @@ func (c *Cacher) vectorMerge(resp promql.Vector, cached *promql.Result) (promql.
}
output := make(promql.Matrix, 0, len(cacheMatrix))
for _, cachedTs := range cacheMatrix {
cachedSeries := genSeriesLabelString(&cachedTs.Metric)
newSeries := promql.Series{Metric: cachedTs.Metric}
newSeries.Points = cachedTs.Points
for _, samples := range resp {
respSeries := genSeriesLabelString(&samples.Metric)
if respSeries == cachedSeries {
if labelsEqual(&cachedTs.Metric, &samples.Metric) {
insertedPointAt := sort.Search(len(newSeries.Points), func(i int) bool {
return newSeries.Points[i].T >= samples.Point.T
})
Expand Down Expand Up @@ -349,10 +344,14 @@ func vectorTomatrix(v *promql.Vector) promql.Matrix {
return output
}

func genSeriesLabelString(lb *labels.Labels) string {
lbs := make([]string, 0, len(*lb))
for i := 0; i < len(*lb); i++ {
lbs = append(lbs, fmt.Sprintf("%s=%s", (*lb)[i].Name, (*lb)[i].Value))
func labelsEqual(a *labels.Labels, b *labels.Labels) bool {
if a == nil || b == nil || len(*a) != len(*b) {
return false
}
return strings.Join(lbs, ",")
for i := 0; i < len(*a); i++ {
if (*a)[i].Name != (*b)[i].Name || (*a)[i].Value != (*b)[i].Value {
return false
}
}
return true
}
Loading

0 comments on commit 9abb9f8

Please sign in to comment.