Skip to content

Commit

Permalink
Merge pull request #11 from prefeitura-rio/staging/tables-bindings
Browse files Browse the repository at this point in the history
feat: materialize data to BigQuery
  • Loading branch information
gabriel-milan committed May 2, 2024
2 parents c0599df + 606ca25 commit a9a083b
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:

- name: Install Python dependencies for deploying
run: |-
pip install -U pip poetry
pip install -U pip "poetry<1.8"
poetry config virtualenvs.create false
poetry install --with dev --with ci
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cd_staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:

- name: Install Python dependencies for deploying
run: |-
pip install -U pip poetry
pip install -U pip "poetry<1.8"
poetry config virtualenvs.create false
poetry install --with dev --with ci
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/code-tree-analysis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:

- name: Install Python dependencies for deploying
run: |-
pip install -U pip poetry
pip install -U pip "poetry<1.8"
poetry config virtualenvs.create false
poetry install --with dev --with ci
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:

- name: Set up Poetry and upgrade pip
run: |
pip install -U pip poetry
pip install -U pip "poetry<1.8"
- name: Install dependencies
run: |
Expand Down
15 changes: 8 additions & 7 deletions pipelines/lgpd/tables_bindings/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
get_project_tables_iam_policies,
list_projects,
merge_dataframes,
upload_dataframe_to_gsheets,
upload_dataframe_to_bigquery,
)

with Flow(
Expand All @@ -23,20 +23,21 @@
) as rj_escritorio__lgpd__tables_bindings__flow:
# Parameters
credentials_secret_name = Parameter("credentials_secret_name")
sheet_name = Parameter("sheet_name")
spreadsheet_url = Parameter("spreadsheet_url")
dataset_id = Parameter("dataset_id")
dump_mode = Parameter("dump_mode", default="append")
table_id = Parameter("table_id")

# Flow
project_ids = list_projects(credentials_secret_name=credentials_secret_name)
iam_policies_dataframes = get_project_tables_iam_policies.map(
project_id=project_ids, credentials_secret_name=unmapped(credentials_secret_name)
)
merged_dataframe = merge_dataframes(dfs=iam_policies_dataframes)
upload_dataframe_to_gsheets(
upload_dataframe_to_bigquery(
dataframe=merged_dataframe,
spreadsheet_url=spreadsheet_url,
sheet_name=sheet_name,
credentials_secret_name=credentials_secret_name,
dataset_id=dataset_id,
table_id=table_id,
dump_mode=dump_mode,
)


Expand Down
5 changes: 3 additions & 2 deletions pipelines/lgpd/tables_bindings/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
],
parameter_defaults={
"credentials_secret_name": "LGPD_SERVICE_ACCOUNT_B64",
"sheet_name": "tables_bindings",
"spreadsheet_url": "https://docs.google.com/spreadsheets/d/16gVrhfwMl1TUZ_jbWdNKw1xcNMUTNwtkJCfg7nW52go/edit#gid=0", # noqa
"dataset_id": "datalake_gestao",
"dump_mode": "append",
"table_id": "tables_bindings",
},
),
]
Expand Down
67 changes: 26 additions & 41 deletions pipelines/lgpd/tables_bindings/tasks.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from pathlib import Path
from typing import List
from uuid import uuid4

import gspread
import pandas as pd
from google.cloud import asset
from googleapiclient import discovery
from prefect import task
from prefeitura_rio.pipelines_utils.bd import create_table_and_upload_to_gcs
from prefeitura_rio.pipelines_utils.logging import log
from prefeitura_rio.pipelines_utils.pandas import to_partitions

from pipelines.lgpd.tables_bindings.utils import (
batch_get_effective_iam_policies,
get_gcp_credentials,
list_tables,
merge_dataframes_fn,
write_data_to_gsheets,
)


Expand Down Expand Up @@ -100,50 +103,32 @@ def merge_dataframes(dfs: List[pd.DataFrame]) -> pd.DataFrame:


@task
def upload_dataframe_to_gsheets(
dataframe: pd.DataFrame, spreadsheet_url: str, sheet_name: str, credentials_secret_name: str
def upload_dataframe_to_bigquery(
dataframe: pd.DataFrame,
dataset_id: str,
table_id: str,
dump_mode: str,
) -> None:
"""
Update a Google Sheets spreadsheet with a DataFrame.
Upload a DataFrame to BigQuery.
Args:
dataframe: Pandas DataFrame.
spreadsheet_url: Google Sheets spreadsheet URL.
sheet_name: Google Sheets sheet name.
dataframe: The DataFrame to upload.
"""
# Get gspread client
credentials = get_gcp_credentials(
secret_name=credentials_secret_name,
scopes=[
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
],
save_files_dir = Path(f"/tmp/{uuid4()}")
dataframe["data_particao"] = datetime.now().strftime("%Y-%m-%d")
log(f"Generating partitioned files in {save_files_dir}.")
to_partitions(
data=dataframe,
partition_columns=["data_particao"],
savepath=save_files_dir,
)
gspread_client = gspread.authorize(credentials)
# Open spreadsheet
log(f"Opening Google Sheets spreadsheet {spreadsheet_url} with sheet {sheet_name}.")
sheet = gspread_client.open_by_url(spreadsheet_url)
worksheet = sheet.worksheet(sheet_name)
# Update spreadsheet
log("Deleting old data.")
worksheet.clear()
log("Rewriting headers.")
write_data_to_gsheets(
worksheet=worksheet,
data=[dataframe.columns.tolist()],
log("Uploading partitioned files to BigQuery.")
create_table_and_upload_to_gcs(
data_path=save_files_dir,
dataset_id=dataset_id,
table_id=table_id,
dump_mode=dump_mode,
biglake_table=True,
)
log("Updating new data.")
write_data_to_gsheets(
worksheet=worksheet,
data=dataframe.values.tolist(),
start_cell="A2",
)
# Add filters
log("Adding filters.")
first_col = "A"
last_col = chr(ord(first_col) + len(dataframe.columns) - 1)
worksheet.set_basic_filter(f"{first_col}:{last_col}")
# Resize columns
log("Resizing columns.")
worksheet.columns_auto_resize(0, len(dataframe.columns) - 1)
log("Done.")

0 comments on commit a9a083b

Please sign in to comment.