Skip to content

Commit

Permalink
Merge branch 'master' into chore.playtomic-feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Sep 22, 2024
2 parents 56ccf0c + cc74f3c commit 863781d
Show file tree
Hide file tree
Showing 77 changed files with 4,513 additions and 2,536 deletions.
2 changes: 1 addition & 1 deletion warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (as *AzureSynapse) loadUserTables(ctx context.Context) (errorMap map[string
select top 1 %[1]q from %[2]s
where x.id = %[2]s.id
and %[1]q is not null
order by X.received_at desc
order by received_at desc
)
end as %[1]q`, colName, as.Namespace+"."+unionStagingTableName)
// IGNORE NULLS only supported in Azure SQL edge, in which case the query can be shortened to below
Expand Down
76 changes: 62 additions & 14 deletions warehouse/integrations/azure-synapse/azure_synapse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"

"go.uber.org/mock/gomock"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down Expand Up @@ -64,23 +64,64 @@ func TestIntegration(t *testing.T) {
httpPort, err := kithelper.GetFreePort()
require.NoError(t, err)

c := testcompose.New(t, compose.FilePaths([]string{"testdata/docker-compose.yml", "../testdata/docker-compose.jobsdb.yml", "../testdata/docker-compose.minio.yml"}))
c := testcompose.New(t, compose.FilePaths([]string{"testdata/docker-compose.yml", "../testdata/docker-compose.jobsdb.yml", "../testdata/docker-compose.minio.yml", "../testdata/docker-compose.transformer.yml"}))
c.Start(context.Background())

workspaceID := whutils.RandHex()
jobsDBPort := c.Port("jobsDb", 5432)
azureSynapsePort := c.Port("azure_synapse", 1433)
minioEndpoint := fmt.Sprintf("localhost:%d", c.Port("minio", 9000))
transformerURL := fmt.Sprintf("http://localhost:%d", c.Port("transformer", 9090))

jobsDB := whth.JobsDB(t, jobsDBPort)

expectedSchema := model.Schema{
"screens": {"context_source_id": "varchar", "user_id": "varchar", "sent_at": "datetimeoffset", "context_request_ip": "varchar", "original_timestamp": "datetimeoffset", "url": "varchar", "context_source_type": "varchar", "_between": "varchar", "timestamp": "datetimeoffset", "context_ip": "varchar", "context_destination_type": "varchar", "received_at": "datetimeoffset", "title": "varchar", "uuid_ts": "datetimeoffset", "context_destination_id": "varchar", "name": "varchar", "id": "varchar", "_as": "varchar"},
"identifies": {"context_ip": "varchar", "context_destination_id": "varchar", "email": "varchar", "context_request_ip": "varchar", "sent_at": "datetimeoffset", "uuid_ts": "datetimeoffset", "_as": "varchar", "logins": "bigint", "context_source_type": "varchar", "context_traits_logins": "bigint", "name": "varchar", "context_destination_type": "varchar", "_between": "varchar", "id": "varchar", "timestamp": "datetimeoffset", "received_at": "datetimeoffset", "user_id": "varchar", "context_traits_email": "varchar", "context_traits_as": "varchar", "context_traits_name": "varchar", "original_timestamp": "datetimeoffset", "context_traits_between": "varchar", "context_source_id": "varchar"},
"users": {"context_traits_name": "varchar", "context_traits_between": "varchar", "context_request_ip": "varchar", "context_traits_logins": "bigint", "context_destination_id": "varchar", "email": "varchar", "logins": "bigint", "_as": "varchar", "context_source_id": "varchar", "uuid_ts": "datetimeoffset", "context_source_type": "varchar", "context_traits_email": "varchar", "name": "varchar", "id": "varchar", "_between": "varchar", "context_ip": "varchar", "received_at": "datetimeoffset", "sent_at": "datetimeoffset", "context_traits_as": "varchar", "context_destination_type": "varchar", "timestamp": "datetimeoffset", "original_timestamp": "datetimeoffset"},
"product_track": {"review_id": "varchar", "context_source_id": "varchar", "user_id": "varchar", "timestamp": "datetimeoffset", "uuid_ts": "datetimeoffset", "review_body": "varchar", "context_source_type": "varchar", "_as": "varchar", "_between": "varchar", "id": "varchar", "rating": "bigint", "event": "varchar", "original_timestamp": "datetimeoffset", "context_destination_type": "varchar", "context_ip": "varchar", "context_destination_id": "varchar", "sent_at": "datetimeoffset", "received_at": "datetimeoffset", "event_text": "varchar", "product_id": "varchar", "context_request_ip": "varchar"},
"tracks": {"original_timestamp": "datetimeoffset", "context_destination_id": "varchar", "event": "varchar", "context_request_ip": "varchar", "uuid_ts": "datetimeoffset", "context_destination_type": "varchar", "user_id": "varchar", "sent_at": "datetimeoffset", "context_source_type": "varchar", "context_ip": "varchar", "timestamp": "datetimeoffset", "received_at": "datetimeoffset", "context_source_id": "varchar", "event_text": "varchar", "id": "varchar"},
"aliases": {"context_request_ip": "varchar", "context_destination_type": "varchar", "context_destination_id": "varchar", "previous_id": "varchar", "context_ip": "varchar", "sent_at": "datetimeoffset", "id": "varchar", "uuid_ts": "datetimeoffset", "timestamp": "datetimeoffset", "original_timestamp": "datetimeoffset", "context_source_id": "varchar", "user_id": "varchar", "context_source_type": "varchar", "received_at": "datetimeoffset"},
"pages": {"name": "varchar", "url": "varchar", "id": "varchar", "timestamp": "datetimeoffset", "title": "varchar", "user_id": "varchar", "context_source_id": "varchar", "context_source_type": "varchar", "original_timestamp": "datetimeoffset", "context_request_ip": "varchar", "received_at": "datetimeoffset", "_between": "varchar", "context_destination_type": "varchar", "uuid_ts": "datetimeoffset", "context_destination_id": "varchar", "sent_at": "datetimeoffset", "context_ip": "varchar", "_as": "varchar"},
"groups": {"_as": "varchar", "user_id": "varchar", "context_destination_type": "varchar", "sent_at": "datetimeoffset", "context_source_type": "varchar", "received_at": "datetimeoffset", "context_ip": "varchar", "industry": "varchar", "timestamp": "datetimeoffset", "group_id": "varchar", "uuid_ts": "datetimeoffset", "context_source_id": "varchar", "context_request_ip": "varchar", "_between": "varchar", "original_timestamp": "datetimeoffset", "name": "varchar", "_plan": "varchar", "context_destination_id": "varchar", "employees": "bigint", "id": "varchar"},
}
userIDFormat := "userId_azure_synapse"
userIDSQL := "LEFT(user_id, CHARINDEX('_', user_id, CHARINDEX('_', user_id, CHARINDEX('_', user_id) + 1) + 1) - 1)"
uuidTSSQL := "LEFT(uuid_ts, 10)"

testcase := []struct {
name string
tables []string
name string
tables []string
verifySchema func(t *testing.T, db *sql.DB, namespace string)
verifyRecords func(t *testing.T, db *sql.DB, sourceID, destinationID, namespace string)
}{
{
name: "Upload Job",
tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"},
verifySchema: func(t *testing.T, db *sql.DB, namespace string) {
t.Helper()
schema := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT table_name, column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_schema = '%s';`, namespace))
require.Equal(t, expectedSchema, whth.ConvertRecordsToSchema(schema))
},
verifyRecords: func(t *testing.T, db *sql.DB, sourceID, destinationID, namespace string) {
t.Helper()
identifiesRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT %s, %s, context_traits_logins, _as, name, logins, email, original_timestamp, context_ip, context_traits_as, timestamp, received_at, context_destination_type, sent_at, context_source_type, context_traits_between, context_source_id, context_traits_name, context_request_ip, _between, context_traits_email, context_destination_id, id FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "identifies"))
require.ElementsMatch(t, identifiesRecords, whth.UploadJobIdentifiesRecords(userIDFormat, sourceID, destinationID, destType))
usersRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_source_id, context_destination_type, context_request_ip, context_traits_name, context_traits_between, _as, logins, sent_at, context_traits_logins, context_ip, _between, context_traits_email, timestamp, context_destination_id, email, context_traits_as, context_source_type, LEFT(id, CHARINDEX('_', id, CHARINDEX('_', id, CHARINDEX('_', id) + 1) + 1) - 1), %s, received_at, name, original_timestamp FROM %q.%q ORDER BY id;`, uuidTSSQL, namespace, "users"))
require.ElementsMatch(t, usersRecords, whth.UploadJobUsersRecords(userIDFormat, sourceID, destinationID, destType))
tracksRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT original_timestamp, context_destination_id, context_destination_type, %s, context_source_type, timestamp, id, event, sent_at, context_ip, event_text, context_source_id, context_request_ip, received_at, %s FROM %q.%q ORDER BY id;`, uuidTSSQL, userIDSQL, namespace, "tracks"))
require.ElementsMatch(t, tracksRecords, whth.UploadJobTracksRecords(userIDFormat, sourceID, destinationID, destType))
productTrackRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT timestamp, %s, product_id, received_at, context_source_id, sent_at, context_source_type, context_ip, context_destination_type, original_timestamp, context_request_ip, context_destination_id, %s, _as, review_body, _between, review_id, event_text, id, event, rating FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "product_track"))
require.ElementsMatch(t, productTrackRecords, whth.UploadJobProductTrackRecords(userIDFormat, sourceID, destinationID, destType))
pagesRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT %s, context_source_id, id, title, timestamp, context_source_type, _as, received_at, context_destination_id, context_ip, context_destination_type, name, original_timestamp, _between, context_request_ip, sent_at, url, %s FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "pages"))
require.ElementsMatch(t, pagesRecords, whth.UploadJobPagesRecords(userIDFormat, sourceID, destinationID, destType))
screensRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_destination_type, url, context_source_type, title, original_timestamp, %s, _between, context_ip, name, context_request_ip, %s, context_source_id, id, received_at, context_destination_id, timestamp, sent_at, _as FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "screens"))
require.ElementsMatch(t, screensRecords, whth.UploadJobScreensRecords(userIDFormat, sourceID, destinationID, destType))
aliasesRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_source_id, context_destination_id, context_ip, sent_at, id, %s, %s, previous_id, original_timestamp, context_source_type, received_at, context_destination_type, context_request_ip, timestamp FROM %q.%q ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "aliases"))
require.ElementsMatch(t, aliasesRecords, whth.UploadJobAliasesRecords(userIDFormat, sourceID, destinationID, destType))
groupsRecords := whth.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_destination_type, id, _between, _plan, original_timestamp, %s, context_source_id, sent_at, %s, group_id, industry, context_request_ip, context_source_type, timestamp, employees, _as, context_destination_id, received_at, name, context_ip FROM %q.%q ORDER BY id;`, uuidTSSQL, userIDSQL, namespace, "groups"))
require.ElementsMatch(t, groupsRecords, whth.UploadJobGroupsRecords(userIDFormat, sourceID, destinationID, destType))
},
},
}

Expand All @@ -93,7 +134,7 @@ func TestIntegration(t *testing.T) {
namespace = whth.RandSchema(destType)
)

destinationBuilder := backendconfigtest.NewDestinationBuilder(destType).
destination := backendconfigtest.NewDestinationBuilder(destType).
WithID(destinationID).
WithRevisionID(destinationID).
WithConfigOption("host", host).
Expand All @@ -110,15 +151,16 @@ func TestIntegration(t *testing.T) {
WithConfigOption("useSSL", false).
WithConfigOption("endPoint", minioEndpoint).
WithConfigOption("useRudderStorage", false).
WithConfigOption("syncFrequency", "30")
WithConfigOption("syncFrequency", "30").
Build()

workspaceConfig := backendconfigtest.NewConfigBuilder().
WithSource(
backendconfigtest.NewSourceBuilder().
WithID(sourceID).
WithWriteKey(writeKey).
WithWorkspaceID(workspaceID).
WithConnection(destinationBuilder.Build()).
WithConnection(destination).
Build(),
).
WithWorkspaceID(workspaceID).
Expand Down Expand Up @@ -167,10 +209,10 @@ func TestIntegration(t *testing.T) {
JobsDB: jobsDB,
HTTPPort: httpPort,
Client: sqlClient,
JobRunID: misc.FastUUID().String(),
TaskRunID: misc.FastUUID().String(),
StagingFilePath: "testdata/upload-job.staging-1.json",
EventsFilePath: "../testdata/upload-job.events-1.json",
UserID: whth.GetUserId(destType),
TransformerURL: transformerURL,
Destination: destination,
}
ts1.VerifyEvents(t)

Expand All @@ -187,12 +229,18 @@ func TestIntegration(t *testing.T) {
JobsDB: jobsDB,
HTTPPort: httpPort,
Client: sqlClient,
JobRunID: misc.FastUUID().String(),
TaskRunID: misc.FastUUID().String(),
StagingFilePath: "testdata/upload-job.staging-2.json",
EventsFilePath: "../testdata/upload-job.events-2.json",
UserID: whth.GetUserId(destType),
TransformerURL: transformerURL,
Destination: destination,
}
ts2.VerifyEvents(t)

t.Log("verifying schema")
tc.verifySchema(t, db, namespace)

t.Log("verifying records")
tc.verifyRecords(t, db, sourceID, destinationID, namespace)
})
}
})
Expand Down
Loading

0 comments on commit 863781d

Please sign in to comment.