Skip to content

Commit

Permalink
chore: event lag metric (follow up) (#4768)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored Jun 6, 2024
1 parent c8ba6d8 commit b124ce4
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 8 deletions.
4 changes: 2 additions & 2 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1827,9 +1827,9 @@ var _ = Describe("Gateway", func() {
Expect(err).To(BeNil())
Expect(http.StatusOK, resp.StatusCode)

Expect(statStore.GetByName("gateway.pickup_delivery_lag_seconds")).To(Equal([]memstats.Metric{
Expect(statStore.GetByName("gateway.event_pickup_lag_seconds")).To(Equal([]memstats.Metric{
{
Name: "gateway.pickup_delivery_lag_seconds",
Name: "gateway.event_pickup_lag_seconds",
Tags: map[string]string{"sourceId": SourceIDEnabled, "workspaceId": WorkspaceID},
Durations: []time.Duration{
time.Second,
Expand Down
2 changes: 1 addition & 1 deletion gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
continue
}

gw.stats.NewTaggedStat("gateway.pickup_delivery_lag_seconds", stats.TimerType, stats.Tags{
gw.stats.NewTaggedStat("gateway.event_pickup_lag_seconds", stats.TimerType, stats.Tags{
"sourceId": msg.Properties.SourceID,
"workspaceId": msg.Properties.WorkspaceID,
}).Since(msg.Properties.ReceivedAt)
Expand Down
4 changes: 2 additions & 2 deletions processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ func TestProcessorManager(t *testing.T) {
}, 10*time.Minute, 100*time.Millisecond).Should(Equal(0))

require.Equal(t,
statsStore.GetByName("processor.pickup_delivery_lag_seconds"),
statsStore.GetByName("processor.event_pickup_lag_seconds"),
[]memstats.Metric{{
Name: "processor.pickup_delivery_lag_seconds",
Name: "processor.event_pickup_lag_seconds",
Tags: map[string]string{
"sourceId": "sourceID",
"workspaceId": "workspaceID",
Expand Down
2 changes: 1 addition & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,7 +1617,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
requestIP := gatewayBatchEvent.RequestIP
receivedAt := gatewayBatchEvent.ReceivedAt

proc.statsFactory.NewSampledTaggedStat("processor.pickup_delivery_lag_seconds", stats.TimerType, stats.Tags{
proc.statsFactory.NewSampledTaggedStat("processor.event_pickup_lag_seconds", stats.TimerType, stats.Tags{
"sourceId": sourceID,
"workspaceId": batchEvent.WorkspaceId,
}).Since(receivedAt)
Expand Down
4 changes: 2 additions & 2 deletions runner/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ var (
"router.kafka.batch_size": {
1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000,
},
"processor.pickup_delivery_lag_seconds": {
"processor.event_pickup_lag_seconds": {
// 0.1s, 0.5s, 1s, 5s, 1m, 5m, 10m, 30m, 1h, 12h, 24h
0.1, 0.5, 1, 5, 60, 300, 600, 1800, 3600, 12 * 3600, 24 * 3600,
},
"gateway.pickup_delivery_lag_seconds": {
"gateway.event_pickup_lag_seconds": {
// 0.1s, 0.5s, 1s, 5s, 1m, 5m, 10m, 30m, 1h, 12h, 24h
0.1, 0.5, 1, 5, 60, 300, 600, 1800, 3600, 12 * 3600, 24 * 3600,
},
Expand Down

0 comments on commit b124ce4

Please sign in to comment.