Skip to content

Commit

Permalink
chore: add metrics for unique users reporter (#4856)
Browse files Browse the repository at this point in the history
* chore: add metrics for unique users reporter

* added more buckets

* added buckets for latency metric
  • Loading branch information
mihir20 authored Jul 8, 2024
1 parent 5ceb1d4 commit 98bf69a
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 4 deletions.
3 changes: 2 additions & 1 deletion enterprise/trackedusers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package trackedusers
import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
)

type Factory struct {
Expand All @@ -13,5 +14,5 @@ func (f *Factory) Setup(conf *config.Config) (UsersReporter, error) {
if !conf.GetBool("TrackedUsers.enabled", false) {
return NewNoopDataCollector(), nil
}
return NewUniqueUsersReporter(f.Log, conf)
return NewUniqueUsersReporter(f.Log, conf, stats.Default)
}
31 changes: 30 additions & 1 deletion enterprise/trackedusers/users_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/jobsdb"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
txn "github.com/rudderlabs/rudder-server/utils/tx"
Expand Down Expand Up @@ -53,9 +55,10 @@ type UniqueUsersReporter struct {
hllSettings *hll.Settings
instanceID string
now func() time.Time
stats stats.Stats
}

func NewUniqueUsersReporter(log logger.Logger, conf *config.Config) (*UniqueUsersReporter, error) {
func NewUniqueUsersReporter(log logger.Logger, conf *config.Config, stats stats.Stats) (*UniqueUsersReporter, error) {
return &UniqueUsersReporter{
log: log,
hllSettings: &hll.Settings{
Expand All @@ -65,6 +68,7 @@ func NewUniqueUsersReporter(log logger.Logger, conf *config.Config) (*UniqueUser
SparseEnabled: true,
},
instanceID: config.GetString("INSTANCE_ID", "1"),
stats: stats,
now: func() time.Time {
return time.Now()
},
Expand Down Expand Up @@ -178,6 +182,7 @@ func (u *UniqueUsersReporter) ReportUsers(ctx context.Context, reports []*UsersR
defer func() { _ = stmt.Close() }()

for _, report := range reports {
u.recordHllSizeStats(report)
userIDHllString, err := u.hllToString(report.UserIDHll)
if err != nil {
return fmt.Errorf("converting user id hll to string: %w", err)
Expand Down Expand Up @@ -239,3 +244,27 @@ func (u *UniqueUsersReporter) recordIdentifier(idTypeHllMap map[string]*hll.Hll,
idTypeHllMap[identifierType].AddRaw(murmur3.Sum64WithSeed([]byte(identifier), murmurSeed))
return idTypeHllMap
}

func (u *UniqueUsersReporter) recordHllSizeStats(report *UsersReport) {
if report.UserIDHll != nil {
u.stats.NewTaggedStat("tracked_users_hll_bytes", stats.HistogramType, stats.Tags{
"workspace_id": report.WorkspaceID,
"source_id": report.SourceID,
"identifier": idTypeUserID,
}).Observe(float64(len(report.UserIDHll.ToBytes())))
}
if report.AnonymousIDHll != nil {
u.stats.NewTaggedStat("tracked_users_hll_bytes", stats.HistogramType, stats.Tags{
"workspace_id": report.WorkspaceID,
"source_id": report.SourceID,
"identifier": idTypeAnonymousID,
}).Observe(float64(len(report.AnonymousIDHll.ToBytes())))
}
if report.IdentifiedAnonymousIDHll != nil {
u.stats.NewTaggedStat("tracked_users_hll_bytes", stats.HistogramType, stats.Tags{
"workspace_id": report.WorkspaceID,
"source_id": report.SourceID,
"identifier": idTypeIdentifiedAnonymousID,
}).Observe(float64(len(report.IdentifiedAnonymousIDHll.ToBytes())))
}
}
4 changes: 3 additions & 1 deletion enterprise/trackedusers/users_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-go-kit/config"

"github.com/spaolacci/murmur3"
Expand Down Expand Up @@ -377,7 +379,7 @@ func TestUniqueUsersReporter(t *testing.T) {
postgresContainer, err := postgres.Setup(pool, t)
require.NoError(t, err)

collector, err := NewUniqueUsersReporter(logger.NOP, config.Default)
collector, err := NewUniqueUsersReporter(logger.NOP, config.Default, stats.NOP)
require.NoError(t, err)
err = collector.MigrateDatabase(postgresContainer.DBDsn, config.Default)
require.NoError(t, err)
Expand Down
12 changes: 11 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ type processorStats struct {
processJobThroughput func(partition string) stats.Measurement
transformationsThroughput func(partition string) stats.Measurement
DBWriteThroughput func(partition string) stats.Measurement
trackedUsersReportGeneration func(partition string) stats.Measurement
}

type DestStatT struct {
Expand Down Expand Up @@ -606,6 +607,11 @@ func (proc *Handle) Setup(
"partition": partition,
})
}
proc.stats.trackedUsersReportGeneration = func(partition string) stats.Measurement {
return proc.statsFactory.NewTaggedStat("processor_tracked_users_report_gen_seconds", stats.TimerType, stats.Tags{
"partition": partition,
})
}
if proc.config.enableDedup {
proc.dedup = dedup.New(dedup.DefaultPath())
}
Expand Down Expand Up @@ -2019,6 +2025,10 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
if len(statusList) != len(jobList) {
panic(fmt.Errorf("len(statusList):%d != len(jobList):%d", len(statusList), len(jobList)))
}
trackedUsersReportGenStart := time.Now()
trackedUsersReports := proc.trackedUsersReporter.GenerateReportsFromJobs(jobList, proc.getNonEventStreamSources())
proc.stats.trackedUsersReportGeneration(partition).SendTiming(time.Since(trackedUsersReportGenStart))

processTime := time.Since(start)
proc.stats.processJobsTime(partition).SendTiming(processTime)
processJobThroughput := throughputPerSecond(totalEvents, processTime)
Expand All @@ -2040,7 +2050,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf

subJobs.hasMore,
subJobs.rsourcesStats,
proc.trackedUsersReporter.GenerateReportsFromJobs(jobList, proc.getNonEventStreamSources()),
trackedUsersReports,
}
}

Expand Down
14 changes: 14 additions & 0 deletions runner/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,19 @@ var (
// 0.1s, 0.5s, 1s, 5s, 1m, 5m, 10m, 30m, 1h, 12h, 24h
0.1, 0.5, 1, 5, 60, 300, 600, 1800, 3600, 12 * 3600, 24 * 3600,
},
"tracked_users_hll_bytes": {
float64(10 * bytesize.B), // for hll containing single id = 8B
float64(100 * bytesize.B), // for hll containing 10 ids = 80B
float64(400 * bytesize.B), // for hll containing 50 ids = 400B
float64(1 * bytesize.KB), // for hll containing 100 ids = 800B
float64(3 * bytesize.KB), // for hll containing 300 ids = 2400B
float64(6 * bytesize.KB), // for hll containing 700 ids = 5600B
float64(10 * bytesize.KB), // max size for hll with log2m=14, regWidth=5 = 10KB
float64(41 * bytesize.KB), // max size for hll with log2m=16, regWidth=5 = 40KB
},
"processor_tracked_users_report_gen_seconds": {
// 1microsecond, 2.5microsecond, 5microsecond, 1ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s
0.00001, 0.00025, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1,
},
}
)

0 comments on commit 98bf69a

Please sign in to comment.