Skip to content

Commit

Permalink
chore: update router reports payload behind a flag, emit stats to obs…
Browse files Browse the repository at this point in the history
…erve sizes (#5067)
  • Loading branch information
Sidddddarth authored Sep 16, 2024
1 parent bbaaabd commit 7d74183
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 10 deletions.
1 change: 1 addition & 0 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Handle struct {
drainConcurrencyLimit config.ValueLoader[int]
workerInputBufferSize int
saveDestinationResponse bool
reportJobsdbPayload config.ValueLoader[bool]

diagnosisTickerTime time.Duration

Expand Down
1 change: 1 addition & 0 deletions router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (rt *Handle) Setup(
rt.eventOrderKeyThreshold = config.GetReloadableIntVar(200, 1, "Router."+destType+".eventOrderKeyThreshold", "Router.eventOrderKeyThreshold")
rt.eventOrderDisabledStateDuration = config.GetReloadableDurationVar(20, time.Minute, "Router."+destType+".eventOrderDisabledStateDuration", "Router.eventOrderDisabledStateDuration")
rt.eventOrderHalfEnabledStateDuration = config.GetReloadableDurationVar(10, time.Minute, "Router."+destType+".eventOrderHalfEnabledStateDuration", "Router.eventOrderHalfEnabledStateDuration")
rt.reportJobsdbPayload = config.GetReloadableBoolVar(true, "Router."+destType+".reportJobsdbPayload", "Router.reportJobsdbPayload")

statTags := stats.Tags{"destType": rt.destType}
rt.tracer = stats.Default.NewTracer("router")
Expand Down
4 changes: 2 additions & 2 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ var _ = Describe("router", func() {
Expect(metrics).To(HaveLen(1))
Expect(metrics[0].StatusDetail.StatusCode).To(Equal(200))
Expect(metrics[0].StatusDetail.Status).To(Equal(jobsdb.Succeeded.State))
Expect(metrics[0].StatusDetail.SampleEvent).To(Equal(json.RawMessage(`{"message": "some transformed message"}`)))
Expect(metrics[0].StatusDetail.SampleEvent).To(Equal(toRetryJobsList[0].EventPayload))
return nil
},
)
Expand Down Expand Up @@ -1458,7 +1458,7 @@ var _ = Describe("router", func() {
Expect(metrics[0].StatusDetail.StatusCode).To(Equal(500))
Expect(metrics[0].StatusDetail.Status).To(Equal(jobsdb.Failed.State))
Expect(metrics[0].StatusDetail.SampleEvent).To(Equal(json.RawMessage(gaPayload)))
Expect(metrics[0].StatusDetail.SampleResponse).To(ContainSubstring(`failureStage":"RudderStack Transformation Error"`))
Expect(metrics[0].StatusDetail.SampleResponse).To(ContainSubstring(`"routerSubStage":"router_dest_transformer"`))

return nil
},
Expand Down
27 changes: 20 additions & 7 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,11 @@ func (w *worker) processDestinationJobs() {
respBody := strings.Join(respBodyArr, " ")
respStatusCodes, respBodys = w.prepareResponsesForJobs(&destinationJob, respStatusCode, respBody)
}
stats.Default.NewTaggedStat("router_delivery_payload_size_bytes", stats.HistogramType, stats.Tags{
"destType": w.rt.destType,
"workspaceID": destinationJob.JobMetadataArray[0].WorkspaceID,
"destinationID": destinationJob.JobMetadataArray[0].DestinationID,
}).Observe(float64(len(destinationJob.Message)))
}
}
ch <- struct{}{}
Expand Down Expand Up @@ -985,13 +990,9 @@ func (w *worker) postStatusOnResponseQ(respStatusCode int, payload json.RawMessa
}
}

inputPayload := payload
switch errorAt {
case routerutils.ERROR_AT_TF:
inputPayload = destinationJobMetadata.JobT.EventPayload
status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "failureStage", "RudderStack Transformation Error")
default: // includes ERROR_AT_DEL, ERROR_AT_CUST
status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "failureStage", "Destination Error")
inputPayload := destinationJobMetadata.JobT.EventPayload
if !w.rt.reportJobsdbPayload.Load() { // TODO: update default/remove this flag after monitoring the payload sizes
inputPayload = payload
}

status.ErrorResponse = routerutils.EnhanceJSON(status.ErrorResponse, "firstAttemptedAt", firstAttemptedAtTime.Format(misc.RFC3339Milli))
Expand All @@ -1012,6 +1013,18 @@ func (w *worker) postStatusOnResponseQ(respStatusCode int, payload json.RawMessa
}
return
}
if !isSuccessStatus(respStatusCode) {
switch errorAt {
case routerutils.ERROR_AT_TF:
inputPayload = destinationJobMetadata.JobT.EventPayload
status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "routerSubStage", "router_dest_transformer")
default: // includes ERROR_AT_DEL, ERROR_AT_CUST
status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "routerSubStage", "router_dest_delivery")
}
// TODO: update after observing the sizes of the payloads
status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "payloadStage", "router_input")
}

// Saving payload to DB only
// 1. if job failed and
// 2. if router job undergoes batching or dest transform.
Expand Down
4 changes: 4 additions & 0 deletions runner/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,9 @@ var (
// 1microsecond, 2.5microsecond, 5microsecond, 1ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s
0.00001, 0.00025, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1,
},
"router_delivery_payload_size_bytes": {
float64(1 * bytesize.KB), float64(10 * bytesize.KB), float64(100 * bytesize.KB),
float64(1 * bytesize.MB), float64(3 * bytesize.MB), float64(5 * bytesize.MB), float64(10 * bytesize.MB),
},
}
)
2 changes: 1 addition & 1 deletion utils/types/reporting_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (

const MaxLengthExceeded = ":max-length-exceeded:"

var (
const (
DiffStatus = "diff"

// Module names
Expand Down

0 comments on commit 7d74183

Please sign in to comment.