Skip to content

Commit

Permalink
fix: bigquery validations for partition column and type (#5168)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Oct 4, 2024
1 parent 3ed1302 commit a2af47a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 32 deletions.
13 changes: 12 additions & 1 deletion warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,18 @@ func (bq *BigQuery) LoadTestTable(ctx context.Context, location, tableName strin
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false

outputTable := partitionedTable(tableName, bq.now().Format("2006-01-02"))
partitionDate, err := bq.partitionDate()
if err != nil {
return fmt.Errorf("partition date: %w", err)
}

var outputTable string
if bq.avoidPartitionDecorator() {
outputTable = tableName
} else {
outputTable = partitionedTable(tableName, partitionDate)
}

loader := bq.db.Dataset(bq.namespace).Table(outputTable).LoaderFrom(gcsRef)

job, err := loader.Run(ctx)
Expand Down
108 changes: 77 additions & 31 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,40 +641,86 @@ func TestIntegration(t *testing.T) {
})

t.Run("Validations", func(t *testing.T) {
ctx := context.Background()
namespace := whth.RandSchema(destType)

db, err := bigquery.NewClient(ctx,
credentials.ProjectID,
option.WithCredentialsJSON([]byte(credentials.Credentials)),
)
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })
t.Cleanup(func() {
dropSchema(t, db, namespace)
})

dest := backendconfig.DestinationT{
ID: "test_destination_id",
Config: map[string]interface{}{
"project": credentials.ProjectID,
"location": credentials.Location,
"bucketName": credentials.BucketName,
"credentials": credentials.Credentials,
"prefix": "",
"namespace": namespace,
"syncFrequency": "30",
testCases := []struct {
name string
configOverride map[string]any
}{
{
name: "default partitionColumn and partitionType",
},
{
name: "partitionColumn: _PARTITIONTIME, partitionType: day",
configOverride: map[string]any{
"partitionColumn": "_PARTITIONTIME",
"partitionType": "day",
},
},
{
name: "partitionColumn: _PARTITIONTIME, partitionType: hour",
configOverride: map[string]any{
"partitionColumn": "_PARTITIONTIME",
"partitionType": "hour",
},
},
{
name: "partitionColumn: received_at, partitionType: hour",
configOverride: map[string]any{
"partitionColumn": "received_at",
"partitionType": "hour",
},
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
ID: "1UmeD7xhVGHsPDEHoCiSPEGytS3",
Name: "BQ",
DisplayName: "BigQuery",
{
name: "partitionColumn: received_at, partitionType: day",
configOverride: map[string]any{
"partitionColumn": "loaded_at",
"partitionType": "day",
},
},
Name: "bigquery-integration",
Enabled: true,
RevisionID: "test_destination_id",
}
whth.VerifyConfigurationTest(t, dest)

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
namespace := whth.RandSchema(destType)

db, err := bigquery.NewClient(ctx,
credentials.ProjectID,
option.WithCredentialsJSON([]byte(credentials.Credentials)),
)
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })
t.Cleanup(func() {
dropSchema(t, db, namespace)
})

conf := map[string]interface{}{
"project": credentials.ProjectID,
"location": credentials.Location,
"bucketName": credentials.BucketName,
"credentials": credentials.Credentials,
"prefix": "",
"namespace": namespace,
"syncFrequency": "30",
}
for k, v := range tc.configOverride {
conf[k] = v
}

dest := backendconfig.DestinationT{
ID: "test_destination_id",
Config: conf,
DestinationDefinition: backendconfig.DestinationDefinitionT{
ID: "1UmeD7xhVGHsPDEHoCiSPEGytS3",
Name: "BQ",
DisplayName: "BigQuery",
},
Name: "bigquery-integration",
Enabled: true,
RevisionID: "test_destination_id",
}
whth.VerifyConfigurationTest(t, dest)
})
}
})

t.Run("Load Table", func(t *testing.T) {
Expand Down

0 comments on commit a2af47a

Please sign in to comment.