From 7d7418320869b5ed038c29cd5353806408cf17ea Mon Sep 17 00:00:00 2001 From: "siddarth.msv" <82795818+Sidddddarth@users.noreply.github.com> Date: Mon, 16 Sep 2024 15:48:30 +0530 Subject: [PATCH] chore: update router reports payload behind a flag, emit stats to observe sizes (#5067) --- router/handle.go | 1 + router/handle_lifecycle.go | 1 + router/router_test.go | 4 ++-- router/worker.go | 27 ++++++++++++++++++++------- runner/buckets.go | 4 ++++ utils/types/reporting_types.go | 2 +- 6 files changed, 29 insertions(+), 10 deletions(-) diff --git a/router/handle.go b/router/handle.go index d0f26bd112..889bcc96f5 100644 --- a/router/handle.go +++ b/router/handle.go @@ -75,6 +75,7 @@ type Handle struct { drainConcurrencyLimit config.ValueLoader[int] workerInputBufferSize int saveDestinationResponse bool + reportJobsdbPayload config.ValueLoader[bool] diagnosisTickerTime time.Duration diff --git a/router/handle_lifecycle.go b/router/handle_lifecycle.go index 261f93d71d..af56a621c3 100644 --- a/router/handle_lifecycle.go +++ b/router/handle_lifecycle.go @@ -108,6 +108,7 @@ func (rt *Handle) Setup( rt.eventOrderKeyThreshold = config.GetReloadableIntVar(200, 1, "Router."+destType+".eventOrderKeyThreshold", "Router.eventOrderKeyThreshold") rt.eventOrderDisabledStateDuration = config.GetReloadableDurationVar(20, time.Minute, "Router."+destType+".eventOrderDisabledStateDuration", "Router.eventOrderDisabledStateDuration") rt.eventOrderHalfEnabledStateDuration = config.GetReloadableDurationVar(10, time.Minute, "Router."+destType+".eventOrderHalfEnabledStateDuration", "Router.eventOrderHalfEnabledStateDuration") + rt.reportJobsdbPayload = config.GetReloadableBoolVar(true, "Router."+destType+".reportJobsdbPayload", "Router.reportJobsdbPayload") statTags := stats.Tags{"destType": rt.destType} rt.tracer = stats.Default.NewTracer("router") diff --git a/router/router_test.go b/router/router_test.go index 74ac5dd9db..32376862e6 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -1303,7 +1303,7 @@ var _ = Describe("router", func() { Expect(metrics).To(HaveLen(1)) Expect(metrics[0].StatusDetail.StatusCode).To(Equal(200)) Expect(metrics[0].StatusDetail.Status).To(Equal(jobsdb.Succeeded.State)) - Expect(metrics[0].StatusDetail.SampleEvent).To(Equal(json.RawMessage(`{"message": "some transformed message"}`))) + Expect(metrics[0].StatusDetail.SampleEvent).To(Equal(toRetryJobsList[0].EventPayload)) return nil }, ) @@ -1458,7 +1458,7 @@ var _ = Describe("router", func() { Expect(metrics[0].StatusDetail.StatusCode).To(Equal(500)) Expect(metrics[0].StatusDetail.Status).To(Equal(jobsdb.Failed.State)) Expect(metrics[0].StatusDetail.SampleEvent).To(Equal(json.RawMessage(gaPayload))) - Expect(metrics[0].StatusDetail.SampleResponse).To(ContainSubstring(`failureStage":"RudderStack Transformation Error"`)) + Expect(metrics[0].StatusDetail.SampleResponse).To(ContainSubstring(`"routerSubStage":"router_dest_transformer"`)) return nil }, diff --git a/router/worker.go b/router/worker.go index e4bbf8cc71..61e8907141 100644 --- a/router/worker.go +++ b/router/worker.go @@ -588,6 +588,11 @@ func (w *worker) processDestinationJobs() { respBody := strings.Join(respBodyArr, " ") respStatusCodes, respBodys = w.prepareResponsesForJobs(&destinationJob, respStatusCode, respBody) } + stats.Default.NewTaggedStat("router_delivery_payload_size_bytes", stats.HistogramType, stats.Tags{ + "destType": w.rt.destType, + "workspaceID": destinationJob.JobMetadataArray[0].WorkspaceID, + "destinationID": destinationJob.JobMetadataArray[0].DestinationID, + }).Observe(float64(len(destinationJob.Message))) } } ch <- struct{}{} @@ -985,13 +990,9 @@ func (w *worker) postStatusOnResponseQ(respStatusCode int, payload json.RawMessa } } - inputPayload := payload - switch errorAt { - case routerutils.ERROR_AT_TF: - inputPayload = destinationJobMetadata.JobT.EventPayload - status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "failureStage", "RudderStack Transformation Error") - default: // includes ERROR_AT_DEL, ERROR_AT_CUST - status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "failureStage", "Destination Error") + inputPayload := destinationJobMetadata.JobT.EventPayload + if !w.rt.reportJobsdbPayload.Load() { // TODO: update default/remove this flag after monitoring the payload sizes + inputPayload = payload } status.ErrorResponse = routerutils.EnhanceJSON(status.ErrorResponse, "firstAttemptedAt", firstAttemptedAtTime.Format(misc.RFC3339Milli)) @@ -1012,6 +1013,18 @@ func (w *worker) postStatusOnResponseQ(respStatusCode int, payload json.RawMessa } return } + if !isSuccessStatus(respStatusCode) { + switch errorAt { + case routerutils.ERROR_AT_TF: + inputPayload = destinationJobMetadata.JobT.EventPayload + status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "routerSubStage", "router_dest_transformer") + default: // includes ERROR_AT_DEL, ERROR_AT_CUST + status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "routerSubStage", "router_dest_delivery") + } + // TODO: update after observing the sizes of the payloads + status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "payloadStage", "router_input") + } + // Saving payload to DB only // 1. if job failed and // 2. if router job undergoes batching or dest transform. diff --git a/runner/buckets.go b/runner/buckets.go index 5deb831076..69df30b85e 100644 --- a/runner/buckets.go +++ b/runner/buckets.go @@ -128,5 +128,9 @@ var ( // 1microsecond, 2.5microsecond, 5microsecond, 1ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s 0.00001, 0.00025, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, }, + "router_delivery_payload_size_bytes": { + float64(1 * bytesize.KB), float64(10 * bytesize.KB), float64(100 * bytesize.KB), + float64(1 * bytesize.MB), float64(3 * bytesize.MB), float64(5 * bytesize.MB), float64(10 * bytesize.MB), + }, } ) diff --git a/utils/types/reporting_types.go b/utils/types/reporting_types.go index 1f7a529f17..c07cbe9d45 100644 --- a/utils/types/reporting_types.go +++ b/utils/types/reporting_types.go @@ -23,7 +23,7 @@ const ( const MaxLengthExceeded = ":max-length-exceeded:" -var ( +const ( DiffStatus = "diff" // Module names