Skip to content

Commit

Permalink
Merge pull request #9 from prefeitura-rio/staging/tables-bindings
Browse files Browse the repository at this point in the history
[LGPD] Adiciona pipeline de listagem de permissões em tabelas
  • Loading branch information
gabriel-milan committed Feb 21, 2024
2 parents eab0d1a + 44d8c1e commit c0599df
Show file tree
Hide file tree
Showing 13 changed files with 532 additions and 10 deletions.
1 change: 1 addition & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jobs:
uses: hadolint/hadolint-action@54c9adbab1582c2ef04b2016b760714a4bfde3cf
with:
dockerfile: Dockerfile
ignore: DL3008

- name: Set up Python
uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FROM python:${PYTHON_VERSION}

# Install git
RUN apt-get update && \
apt-get install -y git ffmpeg libsm6 libxext6 && \
apt-get install -y --no-install-recommends git ffmpeg libsm6 libxext6 && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

Expand Down
3 changes: 2 additions & 1 deletion pipelines/deteccao_alagamento_cameras/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from pipelines.deteccao_alagamento_cameras.flooding_detection.flows import *
# -*- coding: utf-8 -*-
from pipelines.deteccao_alagamento_cameras.flooding_detection.flows import * # noqa
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def get_prediction(
)

responses.resolve()
if type(responses) == tuple:
if isinstance(responses, tuple):
responses = responses[0]
json_string = responses.text.replace("```json\n", "").replace("\n```", "")
label = json.loads(json_string)["label"]
Expand Down
1 change: 1 addition & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"""
from pipelines.deteccao_alagamento_cameras import * # noqa
from pipelines.exemplo import * # noqa
from pipelines.lgpd import * # noqa
from pipelines.stress import * # noqa
from pipelines.templates import * # noqa
2 changes: 2 additions & 0 deletions pipelines/lgpd/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from pipelines.lgpd.tables_bindings.flows import * # noqa
Empty file.
48 changes: 48 additions & 0 deletions pipelines/lgpd/tables_bindings/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.utilities.edges import unmapped
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.lgpd.tables_bindings.schedules import update_tables_bindings_schedule
from pipelines.lgpd.tables_bindings.tasks import (
get_project_tables_iam_policies,
list_projects,
merge_dataframes,
upload_dataframe_to_gsheets,
)

with Flow(
name="LGPD - Lista de permissões de acesso a tabelas do BigQuery",
state_handlers=[handler_inject_bd_credentials],
skip_if_running=True,
parallelism=5,
) as rj_escritorio__lgpd__tables_bindings__flow:
# Parameters
credentials_secret_name = Parameter("credentials_secret_name")
sheet_name = Parameter("sheet_name")
spreadsheet_url = Parameter("spreadsheet_url")

# Flow
project_ids = list_projects(credentials_secret_name=credentials_secret_name)
iam_policies_dataframes = get_project_tables_iam_policies.map(
project_id=project_ids, credentials_secret_name=unmapped(credentials_secret_name)
)
merged_dataframe = merge_dataframes(dfs=iam_policies_dataframes)
upload_dataframe_to_gsheets(
dataframe=merged_dataframe,
spreadsheet_url=spreadsheet_url,
sheet_name=sheet_name,
credentials_secret_name=credentials_secret_name,
)


rj_escritorio__lgpd__tables_bindings__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
rj_escritorio__lgpd__tables_bindings__flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value],
)
rj_escritorio__lgpd__tables_bindings__flow.schedule = update_tables_bindings_schedule
25 changes: 25 additions & 0 deletions pipelines/lgpd/tables_bindings/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta

import pytz
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock

from pipelines.constants import constants

update_tables_bindings_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2023, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_ESCRITORIO_AGENT_LABEL.value,
],
parameter_defaults={
"credentials_secret_name": "LGPD_SERVICE_ACCOUNT_B64",
"sheet_name": "tables_bindings",
"spreadsheet_url": "https://docs.google.com/spreadsheets/d/16gVrhfwMl1TUZ_jbWdNKw1xcNMUTNwtkJCfg7nW52go/edit#gid=0", # noqa
},
),
]
)
149 changes: 149 additions & 0 deletions pipelines/lgpd/tables_bindings/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
from typing import List

import gspread
import pandas as pd
from google.cloud import asset
from googleapiclient import discovery
from prefect import task
from prefeitura_rio.pipelines_utils.logging import log

from pipelines.lgpd.tables_bindings.utils import (
batch_get_effective_iam_policies,
get_gcp_credentials,
list_tables,
merge_dataframes_fn,
write_data_to_gsheets,
)


@task
def list_projects(credentials_secret_name: str) -> List[str]:
"""
Lists all GCP projects that we have access to.
Args:
mode: Credentials mode.
exclude_dev: Exclude projects that ends with "-dev".
Returns:
List of project IDs.
"""
credentials = get_gcp_credentials(secret_name=credentials_secret_name)
service = discovery.build("cloudresourcemanager", "v1", credentials=credentials)
request = service.projects().list()
projects = []
while request is not None:
response = request.execute()
for project in response.get("projects", []):
project_id = project["projectId"]
log(f"Found project {project_id}.")
projects.append(project_id)
request = service.projects().list_next(previous_request=request, previous_response=response)
log(f"Found {len(projects)} projects.")
return projects


@task
def get_project_tables_iam_policies(project_id: str, credentials_secret_name: str) -> pd.DataFrame:
"""
Get IAM policies for a list of tables in a given project.
Args:
project_id (str): The project ID.
Returns:
pd.DataFrame: A DataFrame with the IAM policies for the given tables. The dataframe contains
the following columns:
- project_id: The project ID.
- dataset_id: The dataset ID.
- table_id: The table ID.
- attached_resource: The resource to which the policy is attached.
- role: The role for the binding.
- member: The member for the binding.
"""
credentials = get_gcp_credentials(secret_name=credentials_secret_name)
tables = list_tables(project_id, credentials)
log(f"Found {len(tables)} tables in project {project_id}.")
client = asset.AssetServiceClient(credentials=credentials)
scope = f"projects/{project_id}"
# Split tables in batches of 20 (maximum allowed by the API)
tables_batches = [tables[i : i + 20] for i in range(0, len(tables), 20)] # noqa
dfs = []
for i, table_batch in enumerate(tables_batches):
log(
f"Getting IAM policies for batch {i + 1}/{len(tables_batches)} (project_id={project_id})." # noqa
)
df_batch = batch_get_effective_iam_policies(client=client, scope=scope, names=table_batch)
dfs.append(df_batch)
if len(dfs) == 0:
log(f"No IAM policies found for project {project_id}.")
return pd.DataFrame()
df = merge_dataframes_fn(dfs)
log(f"Found {len(df)} IAM policies for project {project_id}.")
return df


@task
def merge_dataframes(dfs: List[pd.DataFrame]) -> pd.DataFrame:
"""
Merge a list of DataFrames into a single DataFrame.
Args:
dfs (List[pd.DataFrame]): The DataFrames to merge.
Returns:
pd.DataFrame: The merged DataFrame.
"""
log(f"Merging {len(dfs)} DataFrames.")
return merge_dataframes_fn(dfs)


@task
def upload_dataframe_to_gsheets(
dataframe: pd.DataFrame, spreadsheet_url: str, sheet_name: str, credentials_secret_name: str
) -> None:
"""
Update a Google Sheets spreadsheet with a DataFrame.
Args:
dataframe: Pandas DataFrame.
spreadsheet_url: Google Sheets spreadsheet URL.
sheet_name: Google Sheets sheet name.
"""
# Get gspread client
credentials = get_gcp_credentials(
secret_name=credentials_secret_name,
scopes=[
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
],
)
gspread_client = gspread.authorize(credentials)
# Open spreadsheet
log(f"Opening Google Sheets spreadsheet {spreadsheet_url} with sheet {sheet_name}.")
sheet = gspread_client.open_by_url(spreadsheet_url)
worksheet = sheet.worksheet(sheet_name)
# Update spreadsheet
log("Deleting old data.")
worksheet.clear()
log("Rewriting headers.")
write_data_to_gsheets(
worksheet=worksheet,
data=[dataframe.columns.tolist()],
)
log("Updating new data.")
write_data_to_gsheets(
worksheet=worksheet,
data=dataframe.values.tolist(),
start_cell="A2",
)
# Add filters
log("Adding filters.")
first_col = "A"
last_col = chr(ord(first_col) + len(dataframe.columns) - 1)
worksheet.set_basic_filter(f"{first_col}:{last_col}")
# Resize columns
log("Resizing columns.")
worksheet.columns_auto_resize(0, len(dataframe.columns) - 1)
log("Done.")
Loading

0 comments on commit c0599df

Please sign in to comment.