From 940302f803ca0670f22a714115206c67f250d19e Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Thu, 2 May 2024 14:11:13 -0300 Subject: [PATCH] feat: materialize data to BigQuery --- pipelines/lgpd/tables_bindings/flows.py | 15 ++--- pipelines/lgpd/tables_bindings/schedules.py | 5 +- pipelines/lgpd/tables_bindings/tasks.py | 67 ++++++++------------- 3 files changed, 37 insertions(+), 50 deletions(-) diff --git a/pipelines/lgpd/tables_bindings/flows.py b/pipelines/lgpd/tables_bindings/flows.py index 18150e5..3320fd2 100644 --- a/pipelines/lgpd/tables_bindings/flows.py +++ b/pipelines/lgpd/tables_bindings/flows.py @@ -12,7 +12,7 @@ get_project_tables_iam_policies, list_projects, merge_dataframes, - upload_dataframe_to_gsheets, + upload_dataframe_to_bigquery, ) with Flow( @@ -23,8 +23,9 @@ ) as rj_escritorio__lgpd__tables_bindings__flow: # Parameters credentials_secret_name = Parameter("credentials_secret_name") - sheet_name = Parameter("sheet_name") - spreadsheet_url = Parameter("spreadsheet_url") + dataset_id = Parameter("dataset_id") + dump_mode = Parameter("dump_mode", default="append") + table_id = Parameter("table_id") # Flow project_ids = list_projects(credentials_secret_name=credentials_secret_name) @@ -32,11 +33,11 @@ project_id=project_ids, credentials_secret_name=unmapped(credentials_secret_name) ) merged_dataframe = merge_dataframes(dfs=iam_policies_dataframes) - upload_dataframe_to_gsheets( + upload_dataframe_to_bigquery( dataframe=merged_dataframe, - spreadsheet_url=spreadsheet_url, - sheet_name=sheet_name, - credentials_secret_name=credentials_secret_name, + dataset_id=dataset_id, + table_id=table_id, + dump_mode=dump_mode, ) diff --git a/pipelines/lgpd/tables_bindings/schedules.py b/pipelines/lgpd/tables_bindings/schedules.py index a4ca28a..5035671 100644 --- a/pipelines/lgpd/tables_bindings/schedules.py +++ b/pipelines/lgpd/tables_bindings/schedules.py @@ -17,8 +17,9 @@ ], parameter_defaults={ "credentials_secret_name": "LGPD_SERVICE_ACCOUNT_B64", - "sheet_name": "tables_bindings", - "spreadsheet_url": "https://docs.google.com/spreadsheets/d/16gVrhfwMl1TUZ_jbWdNKw1xcNMUTNwtkJCfg7nW52go/edit#gid=0", # noqa + "dataset_id": "lgpd", + "dump_mode": "append", + "table_id": "tables_bindings", }, ), ] diff --git a/pipelines/lgpd/tables_bindings/tasks.py b/pipelines/lgpd/tables_bindings/tasks.py index 02b6b41..f8a4512 100644 --- a/pipelines/lgpd/tables_bindings/tasks.py +++ b/pipelines/lgpd/tables_bindings/tasks.py @@ -1,19 +1,22 @@ # -*- coding: utf-8 -*- +from datetime import datetime +from pathlib import Path from typing import List +from uuid import uuid4 -import gspread import pandas as pd from google.cloud import asset from googleapiclient import discovery from prefect import task +from prefeitura_rio.pipelines_utils.bd import create_table_and_upload_to_gcs from prefeitura_rio.pipelines_utils.logging import log +from prefeitura_rio.pipelines_utils.pandas import to_partitions from pipelines.lgpd.tables_bindings.utils import ( batch_get_effective_iam_policies, get_gcp_credentials, list_tables, merge_dataframes_fn, - write_data_to_gsheets, ) @@ -100,50 +103,32 @@ def merge_dataframes(dfs: List[pd.DataFrame]) -> pd.DataFrame: @task -def upload_dataframe_to_gsheets( - dataframe: pd.DataFrame, spreadsheet_url: str, sheet_name: str, credentials_secret_name: str +def upload_dataframe_to_bigquery( + dataframe: pd.DataFrame, + dataset_id: str, + table_id: str, + dump_mode: str, ) -> None: """ - Update a Google Sheets spreadsheet with a DataFrame. + Upload a DataFrame to BigQuery. Args: - dataframe: Pandas DataFrame. - spreadsheet_url: Google Sheets spreadsheet URL. - sheet_name: Google Sheets sheet name. + dataframe: The DataFrame to upload. """ - # Get gspread client - credentials = get_gcp_credentials( - secret_name=credentials_secret_name, - scopes=[ - "https://www.googleapis.com/auth/spreadsheets", - "https://www.googleapis.com/auth/drive", - ], + save_files_dir = Path(f"/tmp/{uuid4()}") + dataframe["data_particao"] = datetime.now().strftime("%Y-%m-%d") + log(f"Generating partitioned files in {save_files_dir}.") + to_partitions( + data=dataframe, + partition_columns=["data_particao"], + savepath=save_files_dir, ) - gspread_client = gspread.authorize(credentials) - # Open spreadsheet - log(f"Opening Google Sheets spreadsheet {spreadsheet_url} with sheet {sheet_name}.") - sheet = gspread_client.open_by_url(spreadsheet_url) - worksheet = sheet.worksheet(sheet_name) - # Update spreadsheet - log("Deleting old data.") - worksheet.clear() - log("Rewriting headers.") - write_data_to_gsheets( - worksheet=worksheet, - data=[dataframe.columns.tolist()], + log("Uploading partitioned files to BigQuery.") + create_table_and_upload_to_gcs( + data_path=save_files_dir, + dataset_id=dataset_id, + table_id=table_id, + dump_mode=dump_mode, + biglake_table=True, ) - log("Updating new data.") - write_data_to_gsheets( - worksheet=worksheet, - data=dataframe.values.tolist(), - start_cell="A2", - ) - # Add filters - log("Adding filters.") - first_col = "A" - last_col = chr(ord(first_col) + len(dataframe.columns) - 1) - worksheet.set_basic_filter(f"{first_col}:{last_col}") - # Resize columns - log("Resizing columns.") - worksheet.columns_auto_resize(0, len(dataframe.columns) - 1) log("Done.")