Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: flipping SourceID & OriginalSourceID during transformation #4887

Merged
merged 5 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ func (proc *Handle) makeCommonMetadataFromSingularEvent(singularEvent types.Sing
commonMetadata := transformer.Metadata{}
commonMetadata.SourceID = source.ID
commonMetadata.SourceName = source.Name
commonMetadata.OriginalSourceID = source.OriginalID
commonMetadata.WorkspaceID = source.WorkspaceID
commonMetadata.Namespace = proc.namespace
commonMetadata.InstanceID = proc.instanceID
Expand Down Expand Up @@ -992,6 +993,7 @@ func enhanceWithMetadata(commonMetadata *transformer.Metadata, event *transforme
metadata.SourceType = commonMetadata.SourceType
metadata.SourceCategory = commonMetadata.SourceCategory
metadata.SourceID = commonMetadata.SourceID
metadata.OriginalSourceID = commonMetadata.OriginalSourceID
metadata.SourceName = commonMetadata.SourceName
metadata.WorkspaceID = commonMetadata.WorkspaceID
metadata.Namespace = commonMetadata.Namespace
Expand Down Expand Up @@ -2484,6 +2486,7 @@ func (proc *Handle) transformSrcDest(
commonMetaData := &transformer.Metadata{
SourceID: sourceID,
SourceName: sourceName,
OriginalSourceID: eventList[0].Metadata.OriginalSourceID,
SourceType: eventList[0].Metadata.SourceType,
SourceCategory: eventList[0].Metadata.SourceCategory,
WorkspaceID: workspaceID,
Expand Down
29 changes: 28 additions & 1 deletion processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
type Metadata struct {
SourceID string `json:"sourceId"`
SourceName string `json:"sourceName"`
OriginalSourceID string `json:"originalSourceId"`
WorkspaceID string `json:"workspaceId"`
Namespace string `json:"namespace"`
InstanceID string `json:"instanceId"`
Expand Down Expand Up @@ -230,7 +231,33 @@

// UserTransform function is used to invoke user transformer API
func (trans *handle) UserTransform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response {
return trans.transform(ctx, clientEvents, trans.userTransformURL(), batchSize, userTransformerStage)
// flip sourceID and originalSourceID if it's a replay source for the purpose of any user transformation
// flip back afterwards
for _, clientEvent := range clientEvents {
if clientEvent.Metadata.OriginalSourceID != "" {
originalSourceID := clientEvent.Metadata.OriginalSourceID
clientEvent.Metadata.OriginalSourceID = clientEvent.Metadata.SourceID
clientEvent.Metadata.SourceID = originalSourceID

Check warning on line 240 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L238-L240

Added lines #L238 - L240 were not covered by tests
}
}
response := trans.transform(ctx, clientEvents, trans.userTransformURL(), batchSize, userTransformerStage)
for _, event := range response.Events {
// flip back sourceID and original sourceID
if event.Metadata.OriginalSourceID != "" {
originalSourceID := event.Metadata.SourceID
event.Metadata.SourceID = event.Metadata.OriginalSourceID
event.Metadata.OriginalSourceID = originalSourceID

Check warning on line 249 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L247-L249

Added lines #L247 - L249 were not covered by tests
}
}
for _, event := range response.FailedEvents {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both these flip backs can be moved to the (*handle)transform(...) Response method at the end where we populate the Response(failedEvents and outClientEvents).

// flip back sourceID and original sourceID
if event.Metadata.OriginalSourceID != "" {
originalSourceID := event.Metadata.SourceID
event.Metadata.SourceID = event.Metadata.OriginalSourceID
event.Metadata.OriginalSourceID = originalSourceID

Check warning on line 257 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L255-L257

Added lines #L255 - L257 were not covered by tests
}
}
return response
}

// Validate function is used to invoke tracking plan validation API
Expand Down
Loading