Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: common flusher for reporting with mtu handler #4823

Merged
merged 58 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
8674775
feat: define reporting flusher interface
satishrudderstack Jun 20, 2024
9dd764f
comments
satishrudderstack Jun 20, 2024
283c45c
add comments
satishrudderstack Jun 20, 2024
b7573fa
add in-app aggregator interface
satishrudderstack Jun 20, 2024
cb0f33b
add flusher as a struct
satishrudderstack Jun 20, 2024
a1181eb
add tracked users flusher struct
satishrudderstack Jun 20, 2024
83fadce
update tracked user flusher
satishrudderstack Jun 20, 2024
cefbf6e
add common flusher to reporting
satishrudderstack Jun 21, 2024
d8bc98b
cleanup
satishrudderstack Jun 21, 2024
542c3a6
cleanup
satishrudderstack Jun 21, 2024
6d9c9f4
add batching support while making calls to reporting
satishrudderstack Jun 24, 2024
fe8ceb2
add increase concurrency functionaliry
satishrudderstack Jun 24, 2024
a0f2398
aggresive flush if lag increases
satishrudderstack Jun 25, 2024
da7052e
refactor
satishrudderstack Jun 26, 2024
675258a
add tests
satishrudderstack Jun 27, 2024
9f50ef5
lint fixes
satishrudderstack Jun 27, 2024
ade2b1b
lint fixes
satishrudderstack Jun 27, 2024
3805fa1
lint fix
satishrudderstack Jun 27, 2024
7497ede
use utc time everywhere
satishrudderstack Jun 27, 2024
8054bf2
Merge branch 'master' into feat.tracked-users-reporting
satishrudderstack Jun 27, 2024
fcf4ea8
rename metrics
satishrudderstack Jun 27, 2024
374bc71
metrics naming
satishrudderstack Jun 27, 2024
6b6a33e
remove comments with TODOs
satishrudderstack Jun 27, 2024
44de774
use aggressive flush flag
satishrudderstack Jun 27, 2024
da6cb62
remove unused fields
satishrudderstack Jun 27, 2024
7dfa68e
rename prometheus metrics with convention
satishrudderstack Jun 28, 2024
36a94e8
flush the entire window at once
satishrudderstack Jun 28, 2024
4a4076e
fix typo
satishrudderstack Jun 28, 2024
17782bd
ensure flush window to be the same on retries
satishrudderstack Jun 28, 2024
340e775
change getRange to return isValid
satishrudderstack Jun 28, 2024
d685ef8
prometheus metrics namming
satishrudderstack Jun 28, 2024
63ce9d0
review comments
satishrudderstack Jul 1, 2024
63a1fb1
Merge branch 'master' into feat.tracked-users-reporting
satishrudderstack Jul 1, 2024
0317126
invoke tracked users flusher from core
satishrudderstack Jul 1, 2024
aae4e37
add a aggregator module which returns aggregated json reports
satishrudderstack Jul 2, 2024
9b80d27
add aggregator module
satishrudderstack Jul 2, 2024
f56437f
cleanup
satishrudderstack Jul 2, 2024
28e9be7
cleanup
satishrudderstack Jul 2, 2024
9dd9970
add cron runner file
satishrudderstack Jul 2, 2024
76eb527
add reportedAt to reporting payload
satishrudderstack Jul 2, 2024
02500eb
lint fixes
satishrudderstack Jul 2, 2024
e31f0c3
lint fixes
satishrudderstack Jul 2, 2024
7199623
add missing metrics
satishrudderstack Jul 3, 2024
cddc53d
add nop for runner
satishrudderstack Jul 3, 2024
aac3db0
add bytes sent metric
satishrudderstack Jul 3, 2024
624e6af
fix end
satishrudderstack Jul 3, 2024
a60a753
fix test
satishrudderstack Jul 3, 2024
42efbbf
review comments
satishrudderstack Jul 3, 2024
1b49de6
lint fixes
satishrudderstack Jul 3, 2024
4dc8da0
add tests for tracked users flushing
satishrudderstack Jul 4, 2024
3a92b2d
Merge branch 'master' into feat.tracked-users-reporting
satishrudderstack Jul 4, 2024
cd0e1da
fix go.mod
satishrudderstack Jul 4, 2024
183ea44
lint fix
satishrudderstack Jul 4, 2024
dc13987
lint fix
satishrudderstack Jul 4, 2024
5d494c7
remove flusher tests
satishrudderstack Jul 4, 2024
2ebb993
don't panic if there are errors in flusher, we need to emit lag metri…
satishrudderstack Jul 4, 2024
bedf901
review comments
satishrudderstack Jul 8, 2024
e3425f7
Merge branch 'master' into feat.tracked-users-reporting
satishrudderstack Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions enterprise/reporting/flusher/aggregator/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//go:generate mockgen -destination=./aggregator_mock.go -package=aggregator -source=./aggregator.go Aggregator
package aggregator

import (
"context"
"encoding/json"
"time"
)

type Aggregator interface {
Aggregate(ctx context.Context, start, end time.Time) (jsonReports []json.RawMessage, err error)
}
52 changes: 52 additions & 0 deletions enterprise/reporting/flusher/aggregator/aggregator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions enterprise/reporting/flusher/aggregator/nop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package aggregator

import (
"context"
"encoding/json"
"time"
)

type NOP struct{}

func (n *NOP) Aggregate(ctx context.Context, start, end time.Time) (jsonReports []json.RawMessage, err error) {
return nil, nil

Check warning on line 12 in enterprise/reporting/flusher/aggregator/nop.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/nop.go#L11-L12

Added lines #L11 - L12 were not covered by tests
}
120 changes: 120 additions & 0 deletions enterprise/reporting/flusher/aggregator/tracked_users_inapp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package aggregator

import (
"context"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"time"

"github.com/segmentio/go-hll"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
)

const tableName = `tracked_users_reports`

type TrackedUsersInAppAggregator struct {
db *sql.DB
stats stats.Stats

reportsCounter stats.Measurement
aggReportsCounter stats.Measurement
}

func NewTrackedUsersInAppAggregator(db *sql.DB, s stats.Stats, conf *config.Config, module string) *TrackedUsersInAppAggregator {
t := TrackedUsersInAppAggregator{db: db, stats: s}
tags := stats.Tags{
"instance": conf.GetString("INSTANCE_ID", "1"),
"table": tableName,
"module": module,
}
t.reportsCounter = t.stats.NewTaggedStat("reporting_flusher_get_reports_count", stats.HistogramType, tags)
t.aggReportsCounter = t.stats.NewTaggedStat("reporting_flusher_get_aggregated_reports_count", stats.HistogramType, tags)
return &t
}

func (a *TrackedUsersInAppAggregator) Aggregate(ctx context.Context, start, end time.Time) (jsonReports []json.RawMessage, err error) {
query := `SELECT reported_at, workspace_id, source_id, instance_id, userid_hll, anonymousid_hll, identified_anonymousid_hll FROM ` + tableName + ` WHERE reported_at >= $1 AND reported_at < $2 ORDER BY reported_at`

rows, err := a.db.Query(query, start, end)
if err != nil {
return nil, fmt.Errorf("cannot read reports %w", err)

Check warning on line 44 in enterprise/reporting/flusher/aggregator/tracked_users_inapp.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/tracked_users_inapp.go#L44

Added line #L44 was not covered by tests
}
defer rows.Close()

total := 0
aggReportsMap := make(map[string]*TrackedUsersReport)
for rows.Next() {
total += 1
r := TrackedUsersReport{}
err := rows.Scan(&r.ReportedAt, &r.WorkspaceID, &r.SourceID, &r.InstanceID, &r.UserIDHLLHex, &r.AnonymousIDHLLHex, &r.IdentifiedAnonymousIDHLLHex)
if err != nil {
return nil, fmt.Errorf("cannot scan row %w", err)

Check warning on line 55 in enterprise/reporting/flusher/aggregator/tracked_users_inapp.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/tracked_users_inapp.go#L55

Added line #L55 was not covered by tests
}
r.UserIDHLL, err = a.decodeHLL(r.UserIDHLLHex)
if err != nil {
return nil, fmt.Errorf("cannot decode hll %w", err)

Check warning on line 59 in enterprise/reporting/flusher/aggregator/tracked_users_inapp.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/tracked_users_inapp.go#L59

Added line #L59 was not covered by tests
}
r.AnonymousIDHLL, err = a.decodeHLL(r.AnonymousIDHLLHex)
if err != nil {
return nil, fmt.Errorf("cannot decode hll %w", err)

Check warning on line 63 in enterprise/reporting/flusher/aggregator/tracked_users_inapp.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/tracked_users_inapp.go#L63

Added line #L63 was not covered by tests
}
r.IdentifiedAnonymousIDHLL, err = a.decodeHLL(r.IdentifiedAnonymousIDHLLHex)
if err != nil {
return nil, fmt.Errorf("cannot decode hll %w", err)

Check warning on line 67 in enterprise/reporting/flusher/aggregator/tracked_users_inapp.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/tracked_users_inapp.go#L67

Added line #L67 was not covered by tests
}

k := fmt.Sprintf("%s-%s-%s", r.WorkspaceID, r.SourceID, r.InstanceID)

if agg, exists := aggReportsMap[k]; exists {
agg.UserIDHLL.Union(*r.UserIDHLL)
agg.AnonymousIDHLL.Union(*r.AnonymousIDHLL)
agg.IdentifiedAnonymousIDHLL.Union(*r.IdentifiedAnonymousIDHLL)
aggReportsMap[k] = agg
} else {
aggReportsMap[k] = &r
}

}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows errors %w", err)

Check warning on line 84 in enterprise/reporting/flusher/aggregator/tracked_users_inapp.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/tracked_users_inapp.go#L84

Added line #L84 was not covered by tests
}

jsonReports, err = marshalReports(aggReportsMap)
if err != nil {
return nil, fmt.Errorf("error marshalling reports %w", err)

Check warning on line 89 in enterprise/reporting/flusher/aggregator/tracked_users_inapp.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/tracked_users_inapp.go#L89

Added line #L89 was not covered by tests
}

a.reportsCounter.Observe(float64(total))
a.aggReportsCounter.Observe(float64(len(jsonReports)))

return jsonReports, nil
}

func (a *TrackedUsersInAppAggregator) decodeHLL(encoded string) (*hll.Hll, error) {
data, err := hex.DecodeString(encoded)
if err != nil {
return nil, err

Check warning on line 101 in enterprise/reporting/flusher/aggregator/tracked_users_inapp.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/tracked_users_inapp.go#L101

Added line #L101 was not covered by tests
}
hll, err := hll.FromBytes(data)
if err != nil {
return nil, err

Check warning on line 105 in enterprise/reporting/flusher/aggregator/tracked_users_inapp.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/tracked_users_inapp.go#L105

Added line #L105 was not covered by tests
}
return &hll, nil
}

func marshalReports(aggReportsMap map[string]*TrackedUsersReport) ([]json.RawMessage, error) {
jsonReports := make([]json.RawMessage, 0, len(aggReportsMap))
for _, v := range aggReportsMap {
data, err := json.Marshal(v)
itsmihir marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err

Check warning on line 115 in enterprise/reporting/flusher/aggregator/tracked_users_inapp.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/aggregator/tracked_users_inapp.go#L115

Added line #L115 was not covered by tests
}
jsonReports = append(jsonReports, data)
}
return jsonReports, nil
}
35 changes: 35 additions & 0 deletions enterprise/reporting/flusher/aggregator/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package aggregator

import (
"encoding/hex"
"encoding/json"
"time"

"github.com/segmentio/go-hll"
)

type TrackedUsersReport struct {
ReportedAt time.Time `json:"reportedAt"`
WorkspaceID string `json:"workspaceId"`
SourceID string `json:"sourceId"`
InstanceID string `json:"instanceId"`
UserIDHLL *hll.Hll `json:"-"`
AnonymousIDHLL *hll.Hll `json:"-"`
IdentifiedAnonymousIDHLL *hll.Hll `json:"-"`
UserIDHLLHex string `json:"userIdHLL"`
AnonymousIDHLLHex string `json:"anonymousIdHLL"`
IdentifiedAnonymousIDHLLHex string `json:"identifiedAnonymousIdHLL"`
}

func (t *TrackedUsersReport) MarshalJSON() ([]byte, error) {
t.UserIDHLLHex = hex.EncodeToString(t.UserIDHLL.ToBytes())
t.AnonymousIDHLLHex = hex.EncodeToString(t.AnonymousIDHLL.ToBytes())
t.IdentifiedAnonymousIDHLLHex = hex.EncodeToString(t.IdentifiedAnonymousIDHLL.ToBytes())

type Alias TrackedUsersReport
return json.Marshal(&struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Alias needed? can't we directly use TrackedUsersReport?

*Alias
}{
Alias: (*Alias)(t),
})
}
130 changes: 130 additions & 0 deletions enterprise/reporting/flusher/cron_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package flusher

import (
"context"
"time"

"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
)

type Runner interface {
Run()
Stop()
}

type NOPCronRunner struct{}

func (c *NOPCronRunner) Run() {}
func (c *NOPCronRunner) Stop() {}

type CronRunner struct {
ctx context.Context
cancel context.CancelFunc
g *errgroup.Group

stats stats.Stats
log logger.Logger

instanceId string
table string
module string
flusher *Flusher
sleepInterval config.ValueLoader[time.Duration]

flushTimer stats.Measurement
reportingLag stats.Measurement

started atomic.Bool
}

func NewCronRunner(ctx context.Context, log logger.Logger, stats stats.Stats, conf *config.Config, flusher *Flusher, table, module string) *CronRunner {
sleepInterval := conf.GetReloadableDurationVar(5, time.Second, "Reporting.flusher.sleepInterval")
instanceId := conf.GetString("INSTANCE_ID", "1")

ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)

c := &CronRunner{
ctx: ctx,
cancel: cancel,
g: g,
stats: stats,
log: log,
instanceId: instanceId,
flusher: flusher,
sleepInterval: sleepInterval,
table: table,
module: module,
}
c.initStats()

return c
}

func (c *CronRunner) initStats() {
commonTags := stats.Tags{
"instance": c.instanceId,
"table": c.table,
"module": c.module,
}
c.flushTimer = c.stats.NewTaggedStat("reporting_flusher_flush_duration_seconds", stats.TimerType, commonTags)
c.reportingLag = c.stats.NewTaggedStat("reporting_flusher_lag_seconds", stats.GaugeType, commonTags)
}

func (c *CronRunner) Run() {
c.g.Go(func() error {
return c.startFlushing(c.ctx)
})

c.started.Store(true)

if err := c.g.Wait(); err != nil {
c.log.Errorn("error in cron-runner", obskit.Error(err))
}
}

func (c *CronRunner) startFlushing(ctx context.Context) error {
ticker := time.NewTicker(c.sleepInterval.Load())
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()

Check warning on line 99 in enterprise/reporting/flusher/cron_runner.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/cron_runner.go#L98-L99

Added lines #L98 - L99 were not covered by tests
default:
s := time.Now()
if err := c.flusher.Flush(ctx); err != nil {
c.log.Errorn("error in Flush", obskit.Error(err))

Check warning on line 103 in enterprise/reporting/flusher/cron_runner.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/cron_runner.go#L103

Added line #L103 was not covered by tests
}
c.flushTimer.Since(s)

if !c.flusher.ShouldFlushAggressively(ctx) {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:

Check warning on line 111 in enterprise/reporting/flusher/cron_runner.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/cron_runner.go#L111

Added line #L111 was not covered by tests
}
}
}
}
}

func (c *CronRunner) Stop() {
c.cancel()
err := c.g.Wait()
if err != nil {
c.log.Errorn("error in stopping cron-runner", obskit.Error(err))
}

err = c.flusher.CleanUp()
if err != nil {
c.log.Errorn("error in flusher cleanup", obskit.Error(err))

Check warning on line 127 in enterprise/reporting/flusher/cron_runner.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/cron_runner.go#L127

Added line #L127 was not covered by tests
}
c.started.Store(false)
}
Loading
Loading