Skip to content

Commit

Permalink
chore: measure delivery lag in gateway and processor (#4756)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored Jun 6, 2024
1 parent 0157b63 commit c8ba6d8
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 3 deletions.
24 changes: 22 additions & 2 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1754,6 +1754,17 @@ var _ = Describe("Gateway", func() {
return []byte(fmt.Sprintf(`[%s,%s]`, internalBatchPayload(), internalBatchPayload()))
}

// a second after receivedAt
now, err := time.Parse(time.RFC3339Nano, "2024-01-01T01:01:02.000000001Z")
Expect(err).To(BeNil())

statStore, err := memstats.New(
memstats.WithNow(func() time.Time {
return now
}),
)
Expect(err).To(BeNil())

BeforeEach(func() {
c.mockSuppressUser = mocksTypes.NewMockUserSuppression(c.mockCtrl)
c.mockSuppressUserFeature = mocksApp.NewMockSuppressUserFeature(c.mockCtrl)
Expand All @@ -1771,15 +1782,14 @@ var _ = Describe("Gateway", func() {
conf.Set("Gateway.enableSuppressUserFeature", true)
conf.Set("Gateway.enableEventSchemasFeature", false)

var err error
serverPort, err := kithelper.GetFreePort()
Expect(err).To(BeNil())
internalBatchEndpoint = fmt.Sprintf("http://localhost:%d/internal/v1/batch", serverPort)
GinkgoT().Setenv("RSERVER_GATEWAY_WEB_PORT", strconv.Itoa(serverPort))

gateway = &Handle{}
srcDebugger = mocksrcdebugger.NewMockSourceDebugger(c.mockCtrl)
err = gateway.Setup(context.Background(), conf, logger.NOP, stats.NOP, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), srcDebugger, nil)
err = gateway.Setup(context.Background(), conf, logger.NOP, statStore, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), srcDebugger, nil)
Expect(err).To(BeNil())
waitForBackendConfigInit(gateway)
c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).AnyTimes()
Expand Down Expand Up @@ -1816,6 +1826,16 @@ var _ = Describe("Gateway", func() {
resp, err := client.Do(req)
Expect(err).To(BeNil())
Expect(http.StatusOK, resp.StatusCode)

Expect(statStore.GetByName("gateway.pickup_delivery_lag_seconds")).To(Equal([]memstats.Metric{
{
Name: "gateway.pickup_delivery_lag_seconds",
Tags: map[string]string{"sourceId": SourceIDEnabled, "workspaceId": WorkspaceID},
Durations: []time.Duration{
time.Second,
},
},
}))
})

It("Successful request, without debugger", func() {
Expand Down
5 changes: 5 additions & 0 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,11 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
continue
}

gw.stats.NewTaggedStat("gateway.pickup_delivery_lag_seconds", stats.TimerType, stats.Tags{
"sourceId": msg.Properties.SourceID,
"workspaceId": msg.Properties.WorkspaceID,
}).Since(msg.Properties.ReceivedAt)

jobsDBParams := params{
MessageID: msg.Properties.MessageID,
SourceID: msg.Properties.SourceID,
Expand Down
6 changes: 6 additions & 0 deletions processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,9 @@ func WithAdaptiveLimit(adaptiveLimitFunction func(int64) int64) Opts {
l.Handle.adaptiveLimit = adaptiveLimitFunction
}
}

func WithStats(stats stats.Stats) Opts {
return func(l *LifecycleManager) {
l.Handle.statsFactory = stats
}
}
28 changes: 28 additions & 0 deletions processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/google/uuid"
. "github.com/onsi/gomega"
"github.com/ory/dockertest/v3"
"github.com/samber/lo"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
"github.com/rudderlabs/rudder-server/admin"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/reporting"
Expand Down Expand Up @@ -133,6 +135,7 @@ func genJobs(customVal string, jobCount, eventsPerJob int) []*jobsdb.JobT {
UUID: uuid.New(),
CustomVal: customVal,
EventCount: eventsPerJob,
WorkspaceId: "workspaceID",
}
}
return js
Expand Down Expand Up @@ -166,6 +169,15 @@ func TestProcessorManager(t *testing.T) {
require.NoError(t, err, "GetUnprocessed failed")
require.Equal(t, 0, len(unprocessedListEmpty.Jobs))

t.Log("now is a second after jobs' receivedAt time")
now, err := time.Parse(time.RFC3339Nano, "2021-06-06T20:26:40.598+05:30")
require.NoError(t, err, "parsing now")

statsStore, err := memstats.New(memstats.WithNow(func() time.Time {
return now
}))
require.NoError(t, err)

jobCountPerDS := 10
eventsPerJob := 10
err = tempDB.Store(context.Background(), genJobs(customVal, jobCountPerDS, eventsPerJob))
Expand Down Expand Up @@ -222,6 +234,7 @@ func TestProcessorManager(t *testing.T) {
destinationdebugger.NewNoOpService(),
transformationdebugger.NewNoOpService(),
[]enricher.PipelineEnricher{},
WithStats(statsStore),
func(m *LifecycleManager) {
m.Handle.config.enablePipelining = false
})
Expand Down Expand Up @@ -258,6 +271,21 @@ func TestProcessorManager(t *testing.T) {
require.NoError(t, err)
return len(res.Jobs)
}, 10*time.Minute, 100*time.Millisecond).Should(Equal(0))

require.Equal(t,
statsStore.GetByName("processor.pickup_delivery_lag_seconds"),
[]memstats.Metric{{
Name: "processor.pickup_delivery_lag_seconds",
Tags: map[string]string{
"sourceId": "sourceID",
"workspaceId": "workspaceID",
},
Durations: lo.Times(jobCountPerDS, func(i int) time.Duration {
return time.Second
}),
}},
"correctly capture the lag between job's receivedAt and now",
)
})

t.Run("adding more jobs after the processor is already running", func(t *testing.T) {
Expand Down
9 changes: 8 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ func (proc *Handle) Setup(
proc.instanceID = misc.GetInstanceID()

// Stats
proc.statsFactory = stats.Default
if proc.statsFactory == nil {
proc.statsFactory = stats.Default
}
proc.tracer = proc.statsFactory.NewTracer("processor")
proc.stats.statGatewayDBR = func(partition string) stats.Measurement {
return proc.statsFactory.NewTaggedStat("processor_gateway_db_read", stats.CountType, stats.Tags{
Expand Down Expand Up @@ -1615,6 +1617,11 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
requestIP := gatewayBatchEvent.RequestIP
receivedAt := gatewayBatchEvent.ReceivedAt

proc.statsFactory.NewSampledTaggedStat("processor.pickup_delivery_lag_seconds", stats.TimerType, stats.Tags{
"sourceId": sourceID,
"workspaceId": batchEvent.WorkspaceId,
}).Since(receivedAt)

newStatus := jobsdb.JobStatusT{
JobID: batchEvent.JobID,
JobState: jobsdb.Succeeded.State,
Expand Down
8 changes: 8 additions & 0 deletions runner/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,13 @@ var (
"router.kafka.batch_size": {
1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000,
},
"processor.pickup_delivery_lag_seconds": {
// 0.1s, 0.5s, 1s, 5s, 1m, 5m, 10m, 30m, 1h, 12h, 24h
0.1, 0.5, 1, 5, 60, 300, 600, 1800, 3600, 12 * 3600, 24 * 3600,
},
"gateway.pickup_delivery_lag_seconds": {
// 0.1s, 0.5s, 1s, 5s, 1m, 5m, 10m, 30m, 1h, 12h, 24h
0.1, 0.5, 1, 5, 60, 300, 600, 1800, 3600, 12 * 3600, 24 * 3600,
},
}
)

0 comments on commit c8ba6d8

Please sign in to comment.