diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 8f872a9b77..e6adf207f9 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -168,6 +168,11 @@ jobs: go-version-file: 'go.mod' - run: go version - run: go mod download + - name: Login to DockerHub + uses: docker/login-action@v3.3.0 + with: + username: rudderlabs + password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Package Unit [ ${{ matrix.package }} ] env: TEST_KAFKA_CONFLUENT_CLOUD_HOST: ${{ secrets.TEST_KAFKA_CONFLUENT_CLOUD_HOST }} diff --git a/integration_test/warehouse/integration_test.go b/integration_test/warehouse/integration_test.go index 2b64911686..863d2f99d3 100644 --- a/integration_test/warehouse/integration_test.go +++ b/integration_test/warehouse/integration_test.go @@ -14,8 +14,10 @@ import ( "strings" "sync" "testing" + "text/template" "time" + "github.com/mitchellh/mapstructure" "go.uber.org/atomic" "go.uber.org/mock/gomock" @@ -33,6 +35,7 @@ import ( kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres" + "github.com/rudderlabs/rudder-server/admin" "github.com/rudderlabs/rudder-server/app" backendconfig "github.com/rudderlabs/rudder-server/backend-config" @@ -40,7 +43,9 @@ import ( "github.com/rudderlabs/rudder-server/jobsdb" mocksApp "github.com/rudderlabs/rudder-server/mocks/app" mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config" + "github.com/rudderlabs/rudder-server/processor/transformer" "github.com/rudderlabs/rudder-server/services/controlplane/identity" + "github.com/rudderlabs/rudder-server/testhelper/backendconfigtest" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/pubsub" @@ -1499,6 +1504,302 @@ func TestUploads(t *testing.T) { {A: "status", B: exportedData}, }...) }) + t.Run("destination transformation", func(t *testing.T) { + c := testcompose.New(t, compose.FilePaths([]string{"testdata/docker-compose.transformer.yml"})) + c.Start(context.Background()) + + transformerURL := fmt.Sprintf("http://localhost:%d", c.Port("transformer", 9090)) + + conf := config.New() + conf.Set("DEST_TRANSFORM_URL", transformerURL) + conf.Set("USER_TRANSFORM_URL", transformerURL) + + type output struct { + Metadata struct { + Table string `mapstructure:"table"` + Columns map[string]string `mapstructure:"columns"` + } `mapstructure:"metadata"` + Data map[string]any `mapstructure:"data"` + } + + t.Run("allowUsersContextTraits", func(t *testing.T) { + testcases := []struct { + name string + configOverride map[string]any + validateEvents func(t *testing.T, events []transformer.TransformerResponse) + }{ + { + name: "with allowUsersContextTraits=true", + configOverride: map[string]any{ + "allowUsersContextTraits": true, + }, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var identifyEvent output + err := mapstructure.Decode(events[0].Output, &identifyEvent) + require.NoError(t, err) + require.Equal(t, "identifies", identifyEvent.Metadata.Table) + require.Contains(t, identifyEvent.Metadata.Columns, "firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "lastname") + require.Equal(t, "Mickey", identifyEvent.Data["firstname"]) + require.Equal(t, "Mouse", identifyEvent.Data["lastname"]) + require.Equal(t, "Mickey", identifyEvent.Data["context_traits_firstname"]) + + var userEvent output + err = mapstructure.Decode(events[1].Output, &userEvent) + require.NoError(t, err) + require.Equal(t, "users", userEvent.Metadata.Table) + require.Contains(t, userEvent.Metadata.Columns, "firstname") + require.Contains(t, userEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, userEvent.Metadata.Columns, "lastname") + require.Equal(t, "Mickey", userEvent.Data["firstname"]) + require.Equal(t, "Mouse", userEvent.Data["lastname"]) + require.Equal(t, "Mickey", userEvent.Data["context_traits_firstname"]) + }, + }, + { + name: "with allowUsersContextTraits=false", + configOverride: map[string]any{ + "allowUsersContextTraits": false, + }, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var identifyEvent output + err := mapstructure.Decode(events[0].Output, &identifyEvent) + require.NoError(t, err) + require.Equal(t, "identifies", identifyEvent.Metadata.Table) + require.NotContains(t, identifyEvent.Metadata.Columns, "firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "lastname") + require.NotContains(t, identifyEvent.Data, "firstname") + require.Equal(t, "Mouse", identifyEvent.Data["lastname"]) + require.Equal(t, "Mickey", identifyEvent.Data["context_traits_firstname"]) + + var userEvent output + err = mapstructure.Decode(events[1].Output, &userEvent) + require.NoError(t, err) + require.Equal(t, "users", userEvent.Metadata.Table) + require.NotContains(t, userEvent.Metadata.Columns, "firstname") + require.Contains(t, userEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, userEvent.Metadata.Columns, "lastname") + require.NotContains(t, userEvent.Data, "firstname") + require.Equal(t, "Mouse", userEvent.Data["lastname"]) + require.Equal(t, "Mickey", userEvent.Data["context_traits_firstname"]) + }, + }, + { + name: "without allowUsersContextTraits", + configOverride: map[string]any{}, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var identifyEvent output + err := mapstructure.Decode(events[0].Output, &identifyEvent) + require.NoError(t, err) + require.Equal(t, "identifies", identifyEvent.Metadata.Table) + require.NotContains(t, identifyEvent.Metadata.Columns, "firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "lastname") + require.NotContains(t, identifyEvent.Data, "firstname") + require.Equal(t, "Mouse", identifyEvent.Data["lastname"]) + require.Equal(t, "Mickey", identifyEvent.Data["context_traits_firstname"]) + + var userEvent output + err = mapstructure.Decode(events[1].Output, &userEvent) + require.NoError(t, err) + require.Equal(t, "users", userEvent.Metadata.Table) + require.NotContains(t, userEvent.Metadata.Columns, "firstname") + require.Contains(t, userEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, userEvent.Metadata.Columns, "lastname") + require.NotContains(t, userEvent.Data, "firstname") + require.Equal(t, "Mouse", userEvent.Data["lastname"]) + require.Equal(t, "Mickey", userEvent.Data["context_traits_firstname"]) + }, + }, + } + + for _, tc := range testcases { + destinationBuilder := backendconfigtest.NewDestinationBuilder(whutils.BQ). + WithID(destinationID). + WithRevisionID(destinationID) + for k, v := range tc.configOverride { + destinationBuilder.WithConfigOption(k, v) + } + destination := destinationBuilder.Build() + + destinationJSON, err := json.Marshal(destination) + require.NoError(t, err) + + eventTemplate := ` + [ + { + "message": { + "context": { + "traits": { + "firstname": "Mickey" + } + }, + "traits": { + "lastname": "Mouse" + }, + "type": "identify", + "userId": "9bb5d4c2-a7aa-4a36-9efb-dd2b1aec5d33" + }, + "destination": {{.destination}} + } + ] +` + + tpl, err := template.New(uuid.New().String()).Parse(eventTemplate) + require.NoError(t, err) + + b := new(strings.Builder) + err = tpl.Execute(b, map[string]any{ + "destination": string(destinationJSON), + }) + require.NoError(t, err) + + var transformerEvents []transformer.TransformerEvent + err = json.Unmarshal([]byte(b.String()), &transformerEvents) + require.NoError(t, err) + + tr := transformer.NewTransformer(conf, logger.NOP, stats.Default) + response := tr.Transform(context.Background(), transformerEvents, 100) + require.Zero(t, len(response.FailedEvents)) + require.Len(t, response.Events, 2) + + tc.validateEvents(t, response.Events) + } + }) + t.Run("underscoreDivideNumbers", func(t *testing.T) { + testcases := []struct { + name string + configOverride map[string]any + validateEvents func(t *testing.T, events []transformer.TransformerResponse) + }{ + { + name: "with underscoreDivideNumbers=true", + configOverride: map[string]any{ + "underscoreDivideNumbers": true, + }, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var trackOutput output + err := mapstructure.Decode(events[0].Output, &trackOutput) + require.NoError(t, err) + require.Equal(t, "tracks", trackOutput.Metadata.Table) + require.Contains(t, trackOutput.Metadata.Columns, "context_traits_attribute_v_3") + require.Equal(t, "button_clicked_v_2", trackOutput.Data["event"]) + require.Equal(t, "button clicked v2", trackOutput.Data["event_text"]) + require.Equal(t, "some value", trackOutput.Data["context_traits_attribute_v_3"]) + + var buttonClickedOutput output + err = mapstructure.Decode(events[1].Output, &buttonClickedOutput) + require.NoError(t, err) + require.Equal(t, "button_clicked_v_2", buttonClickedOutput.Metadata.Table) + require.Contains(t, buttonClickedOutput.Metadata.Columns, "context_traits_attribute_v_3") + require.Equal(t, "button_clicked_v_2", buttonClickedOutput.Data["event"]) + require.Equal(t, "button clicked v2", buttonClickedOutput.Data["event_text"]) + require.Equal(t, "some value", buttonClickedOutput.Data["context_traits_attribute_v_3"]) + }, + }, + { + name: "with underscoreDivideNumbers=false", + configOverride: map[string]any{ + "underscoreDivideNumbers": false, + }, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var trackOutput output + err := mapstructure.Decode(events[0].Output, &trackOutput) + require.NoError(t, err) + require.Equal(t, "tracks", trackOutput.Metadata.Table) + require.Contains(t, trackOutput.Metadata.Columns, "context_traits_attribute_v3") + require.Equal(t, "button_clicked_v2", trackOutput.Data["event"]) + require.Equal(t, "button clicked v2", trackOutput.Data["event_text"]) + require.Equal(t, "some value", trackOutput.Data["context_traits_attribute_v3"]) + + var buttonClickedOutput output + err = mapstructure.Decode(events[1].Output, &buttonClickedOutput) + require.NoError(t, err) + require.Equal(t, "button_clicked_v2", buttonClickedOutput.Metadata.Table) + require.Contains(t, buttonClickedOutput.Metadata.Columns, "context_traits_attribute_v3") + require.Equal(t, "button_clicked_v2", buttonClickedOutput.Data["event"]) + require.Equal(t, "button clicked v2", buttonClickedOutput.Data["event_text"]) + require.Equal(t, "some value", buttonClickedOutput.Data["context_traits_attribute_v3"]) + }, + }, + { + name: "without underscoreDivideNumbers", + configOverride: map[string]any{}, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var trackOutput output + err := mapstructure.Decode(events[0].Output, &trackOutput) + require.NoError(t, err) + require.Equal(t, "tracks", trackOutput.Metadata.Table) + require.Contains(t, trackOutput.Metadata.Columns, "context_traits_attribute_v3") + require.Equal(t, "button_clicked_v2", trackOutput.Data["event"]) + require.Equal(t, "button clicked v2", trackOutput.Data["event_text"]) + require.Equal(t, "some value", trackOutput.Data["context_traits_attribute_v3"]) + + var buttonClickedOutput output + err = mapstructure.Decode(events[1].Output, &buttonClickedOutput) + require.NoError(t, err) + require.Equal(t, "button_clicked_v2", buttonClickedOutput.Metadata.Table) + require.Contains(t, buttonClickedOutput.Metadata.Columns, "context_traits_attribute_v3") + require.Equal(t, "button_clicked_v2", buttonClickedOutput.Data["event"]) + require.Equal(t, "button clicked v2", buttonClickedOutput.Data["event_text"]) + require.Equal(t, "some value", buttonClickedOutput.Data["context_traits_attribute_v3"]) + }, + }, + } + + for _, tc := range testcases { + destinationBuilder := backendconfigtest.NewDestinationBuilder(whutils.BQ). + WithID(destinationID). + WithRevisionID(destinationID) + for k, v := range tc.configOverride { + destinationBuilder.WithConfigOption(k, v) + } + destination := destinationBuilder.Build() + + destinationJSON, err := json.Marshal(destination) + require.NoError(t, err) + + eventTemplate := ` + [ + { + "message": { + "context": { + "traits": { + "attribute v3": "some value" + } + }, + "event": "button clicked v2", + "type": "track" + }, + "destination": {{.destination}} + } + ] +` + + tpl, err := template.New(uuid.New().String()).Parse(eventTemplate) + require.NoError(t, err) + + b := new(strings.Builder) + err = tpl.Execute(b, map[string]any{ + "destination": string(destinationJSON), + }) + require.NoError(t, err) + + var transformerEvents []transformer.TransformerEvent + err = json.Unmarshal([]byte(b.String()), &transformerEvents) + require.NoError(t, err) + + tr := transformer.NewTransformer(conf, logger.NOP, stats.Default) + response := tr.Transform(context.Background(), transformerEvents, 100) + require.Zero(t, len(response.FailedEvents)) + require.Len(t, response.Events, 2) + + tc.validateEvents(t, response.Events) + } + }) + }) } func runWarehouseServer( diff --git a/integration_test/warehouse/testdata/docker-compose.transformer.yml b/integration_test/warehouse/testdata/docker-compose.transformer.yml new file mode 100644 index 0000000000..55b164b0c3 --- /dev/null +++ b/integration_test/warehouse/testdata/docker-compose.transformer.yml @@ -0,0 +1,11 @@ +version: "3.9" + +services: + transformer: + image: "rudderstack/develop-rudder-transformer:chore.playtomic-feedback" + ports: + - "9090:9090" + healthcheck: + test: wget --no-verbose --tries=1 --spider http://0.0.0.0:9090/health || exit 1 + interval: 1s + retries: 25