From caeb8bab8a465bbf6b55d6142529e5b701840afe Mon Sep 17 00:00:00 2001 From: achettyiitr Date: Fri, 4 Oct 2024 12:20:36 +0530 Subject: [PATCH] fix: bigquery validations for partition column and type --- warehouse/integrations/bigquery/bigquery.go | 13 ++- .../integrations/bigquery/bigquery_test.go | 108 +++++++++++++----- 2 files changed, 89 insertions(+), 32 deletions(-) diff --git a/warehouse/integrations/bigquery/bigquery.go b/warehouse/integrations/bigquery/bigquery.go index b5bd48ffb5..fa2a797810 100644 --- a/warehouse/integrations/bigquery/bigquery.go +++ b/warehouse/integrations/bigquery/bigquery.go @@ -1195,7 +1195,18 @@ func (bq *BigQuery) LoadTestTable(ctx context.Context, location, tableName strin gcsRef.MaxBadRecords = 0 gcsRef.IgnoreUnknownValues = false - outputTable := partitionedTable(tableName, bq.now().Format("2006-01-02")) + partitionDate, err := bq.partitionDate() + if err != nil { + return fmt.Errorf("partition date: %w", err) + } + + var outputTable string + if bq.avoidPartitionDecorator() { + outputTable = tableName + } else { + outputTable = partitionedTable(tableName, partitionDate) + } + loader := bq.db.Dataset(bq.namespace).Table(outputTable).LoaderFrom(gcsRef) job, err := loader.Run(ctx) diff --git a/warehouse/integrations/bigquery/bigquery_test.go b/warehouse/integrations/bigquery/bigquery_test.go index 1172bca82a..4f09cb41b8 100644 --- a/warehouse/integrations/bigquery/bigquery_test.go +++ b/warehouse/integrations/bigquery/bigquery_test.go @@ -641,40 +641,86 @@ func TestIntegration(t *testing.T) { }) t.Run("Validations", func(t *testing.T) { - ctx := context.Background() - namespace := whth.RandSchema(destType) - - db, err := bigquery.NewClient(ctx, - credentials.ProjectID, - option.WithCredentialsJSON([]byte(credentials.Credentials)), - ) - require.NoError(t, err) - t.Cleanup(func() { _ = db.Close() }) - t.Cleanup(func() { - dropSchema(t, db, namespace) - }) - - dest := backendconfig.DestinationT{ - ID: "test_destination_id", - Config: map[string]interface{}{ - "project": credentials.ProjectID, - "location": credentials.Location, - "bucketName": credentials.BucketName, - "credentials": credentials.Credentials, - "prefix": "", - "namespace": namespace, - "syncFrequency": "30", + testCases := []struct { + name string + configOverride map[string]any + }{ + { + name: "default partitionColumn and partitionType", + }, + { + name: "partitionColumn: _PARTITIONTIME, partitionType: day", + configOverride: map[string]any{ + "partitionColumn": "_PARTITIONTIME", + "partitionType": "day", + }, + }, + { + name: "partitionColumn: _PARTITIONTIME, partitionType: hour", + configOverride: map[string]any{ + "partitionColumn": "_PARTITIONTIME", + "partitionType": "hour", + }, + }, + { + name: "partitionColumn: received_at, partitionType: hour", + configOverride: map[string]any{ + "partitionColumn": "received_at", + "partitionType": "hour", + }, }, - DestinationDefinition: backendconfig.DestinationDefinitionT{ - ID: "1UmeD7xhVGHsPDEHoCiSPEGytS3", - Name: "BQ", - DisplayName: "BigQuery", + { + name: "partitionColumn: received_at, partitionType: day", + configOverride: map[string]any{ + "partitionColumn": "loaded_at", + "partitionType": "day", + }, }, - Name: "bigquery-integration", - Enabled: true, - RevisionID: "test_destination_id", } - whth.VerifyConfigurationTest(t, dest) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + namespace := whth.RandSchema(destType) + + db, err := bigquery.NewClient(ctx, + credentials.ProjectID, + option.WithCredentialsJSON([]byte(credentials.Credentials)), + ) + require.NoError(t, err) + t.Cleanup(func() { _ = db.Close() }) + t.Cleanup(func() { + dropSchema(t, db, namespace) + }) + + conf := map[string]interface{}{ + "project": credentials.ProjectID, + "location": credentials.Location, + "bucketName": credentials.BucketName, + "credentials": credentials.Credentials, + "prefix": "", + "namespace": namespace, + "syncFrequency": "30", + } + for k, v := range tc.configOverride { + conf[k] = v + } + + dest := backendconfig.DestinationT{ + ID: "test_destination_id", + Config: conf, + DestinationDefinition: backendconfig.DestinationDefinitionT{ + ID: "1UmeD7xhVGHsPDEHoCiSPEGytS3", + Name: "BQ", + DisplayName: "BigQuery", + }, + Name: "bigquery-integration", + Enabled: true, + RevisionID: "test_destination_id", + } + whth.VerifyConfigurationTest(t, dest) + }) + } }) t.Run("Load Table", func(t *testing.T) {