diff --git a/processor/processor.go b/processor/processor.go index 6b65d73395..f25f66bf8e 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -961,6 +961,7 @@ func (proc *Handle) makeCommonMetadataFromSingularEvent(singularEvent types.Sing commonMetadata := transformer.Metadata{} commonMetadata.SourceID = source.ID commonMetadata.SourceName = source.Name + commonMetadata.OriginalSourceID = source.OriginalID commonMetadata.WorkspaceID = source.WorkspaceID commonMetadata.Namespace = proc.namespace commonMetadata.InstanceID = proc.instanceID @@ -992,6 +993,7 @@ func enhanceWithMetadata(commonMetadata *transformer.Metadata, event *transforme metadata.SourceType = commonMetadata.SourceType metadata.SourceCategory = commonMetadata.SourceCategory metadata.SourceID = commonMetadata.SourceID + metadata.OriginalSourceID = commonMetadata.OriginalSourceID metadata.SourceName = commonMetadata.SourceName metadata.WorkspaceID = commonMetadata.WorkspaceID metadata.Namespace = commonMetadata.Namespace @@ -2495,6 +2497,7 @@ func (proc *Handle) transformSrcDest( commonMetaData := &transformer.Metadata{ SourceID: sourceID, SourceName: sourceName, + OriginalSourceID: eventList[0].Metadata.OriginalSourceID, SourceType: eventList[0].Metadata.SourceType, SourceCategory: eventList[0].Metadata.SourceCategory, WorkspaceID: workspaceID, diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index adbed020a2..d46fdc8611 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -47,6 +47,7 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary type Metadata struct { SourceID string `json:"sourceId"` SourceName string `json:"sourceName"` + OriginalSourceID string `json:"originalSourceId"` WorkspaceID string `json:"workspaceId"` Namespace string `json:"namespace"` InstanceID string `json:"instanceId"` @@ -248,6 +249,15 @@ func (trans *handle) transform( if len(clientEvents) == 0 { return Response{} } + // flip sourceID and originalSourceID if it's a replay source for the purpose of any user transformation + // flip back afterwards + for _, clientEvent := range clientEvents { + if clientEvent.Metadata.OriginalSourceID != "" { + originalSourceID := clientEvent.Metadata.OriginalSourceID + clientEvent.Metadata.OriginalSourceID = clientEvent.Metadata.SourceID + clientEvent.Metadata.SourceID = originalSourceID + } + } sTags := stats.Tags{ "dest_type": clientEvents[0].Destination.DestinationDefinition.Name, "dest_id": clientEvents[0].Destination.ID, @@ -306,6 +316,11 @@ func (trans *handle) transform( // Transform is one to many mapping so returned // response for each is an array. We flatten it out for _, transformerResponse := range batch { + if transformerResponse.Metadata.OriginalSourceID != "" { + originalSourceID := transformerResponse.Metadata.SourceID + transformerResponse.Metadata.SourceID = transformerResponse.Metadata.OriginalSourceID + transformerResponse.Metadata.OriginalSourceID = originalSourceID + } switch transformerResponse.StatusCode { case http.StatusOK: outClientEvents = append(outClientEvents, transformerResponse)