Skip to content

Commit

Permalink
Adiciona captura do GCS na pipeline de subida do GTFS (#206)
Browse files Browse the repository at this point in the history
* commit inicial

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Insere tipo_os

* remove formato dos arquivos

* remove quebra de linha nos nomes das colunas do anexo da faixa horária

* Incorpora io.BytesIO

* remove log

* Refatora tratamento tipo_os

* refatora tratamento tipo_os e anexo da faixa horaria

* remove log

* fix processa_ordem_servico

* Adiciona changelogs e altera run_config do flow gtfs_captura_nova

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Guilherme Botelho <[email protected]>
  • Loading branch information
3 people committed Sep 11, 2024
1 parent 01e2893 commit ca41313
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 61 deletions.
6 changes: 6 additions & 0 deletions pipelines/migration/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - migration

## [1.0.2] - 2024-09-11

### Adicionado

- Adicionado log à função `get_upload_storage_blob` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/206)

## [1.0.1] - 2024-08-29

### Adicionado
Expand Down
9 changes: 9 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog - gtfs

## [1.1.0] - 2024-09-11

### Alterado

- Criada feature para subida manual com base nos arquivos no GCS (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/206)
- Função `get_raw_drive_files` transformada em `get_raw_gtfs_files` e adaptada para capturar os arquivos tanto através do Google Drive quanto através do GCS por meio do novo parâmetro `upload_from_gcs` do flow `gtfs_captura_nova` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/206)
- Funções `processa_ordem_servico`, `processa_ordem_servico_trajeto_alternativo` e `processa_ordem_servico_faixa_horaria` ajustadas para considerar a coluna `tipo_os` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/206)
- Incorporadas outros nomes de colunas a serem renomeados na função `processa_ordem_servico_faixa_horaria`, bem como corrigido o tratamento de colunas ausentes (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/206)

## [1.0.8] - 2024-09-06

### Adicionado
Expand Down
10 changes: 8 additions & 2 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pipelines.migration.br_rj_riodejaneiro_gtfs.tasks import (
get_last_capture_os,
get_os_info,
get_raw_drive_files,
get_raw_gtfs_files,
update_last_captured_os,
)
from pipelines.migration.tasks import (
Expand Down Expand Up @@ -100,6 +100,7 @@
# )

with Flow("SMTR: GTFS - Captura/Tratamento") as gtfs_captura_nova:
upload_from_gcs = Parameter("upload_from_gcs", default=False)
materialize_only = Parameter("materialize_only", default=False)
regular_sheet_index = Parameter("regular_sheet_index", default=None)
data_versao_gtfs_param = Parameter("data_versao_gtfs", default=None)
Expand Down Expand Up @@ -145,10 +146,11 @@
filename=unmapped(filename),
)

raw_filepaths, primary_keys = get_raw_drive_files(
raw_filepaths, primary_keys = get_raw_gtfs_files(
os_control=os_control,
local_filepath=local_filepaths,
regular_sheet_index=regular_sheet_index,
upload_from_gcs=upload_from_gcs,
)

transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map(
Expand Down Expand Up @@ -231,6 +233,10 @@
gtfs_captura_nova.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_SMTR_AGENT_LABEL.value],
cpu_limit="1000m",
memory_limit="4600Mi",
cpu_request="500m",
memory_request="1000Mi",
)
gtfs_captura_nova.state_handlers = [
handler_inject_bd_credentials,
Expand Down
58 changes: 42 additions & 16 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
Tasks for gtfs
"""
import io
import os
import zipfile
from datetime import datetime
Expand All @@ -24,7 +25,7 @@
processa_ordem_servico_faixa_horaria,
processa_ordem_servico_trajeto_alternativo,
)
from pipelines.migration.utils import save_raw_local_func
from pipelines.migration.utils import get_upload_storage_blob, save_raw_local_func


@task
Expand Down Expand Up @@ -153,13 +154,18 @@ def get_os_info(last_captured_os: str = None, data_versao_gtfs: str = None) -> d


@task(nout=2)
def get_raw_drive_files(os_control, local_filepath: list, regular_sheet_index: int = None):
def get_raw_gtfs_files(
os_control, local_filepath: list, regular_sheet_index: int = None, upload_from_gcs: bool = False
):
"""
Downloads raw files from Google Drive and processes them.
Downloads raw files and processes them.
Args:
os_control (dict): A dictionary containing information about the OS (Ordem de Serviço).
local_filepath (list): A list of local file paths where the downloaded files will be saved.
regular_sheet_index (int, optional): The index of the regular sheet. Defaults to None.
upload_from_gcs (bool, optional):
A boolean indicating whether the files should be uploaded from GCS. Defaults to False.
Returns:
raw_filepaths (list): A list of file paths where the downloaded raw files are saved.
Expand All @@ -170,22 +176,42 @@ def get_raw_drive_files(os_control, local_filepath: list, regular_sheet_index: i

log(f"Baixando arquivos: {os_control}")

# Autenticar usando o arquivo de credenciais
credentials = service_account.Credentials.from_service_account_file(
filename=os.environ["GOOGLE_APPLICATION_CREDENTIALS"],
scopes=["https://www.googleapis.com/auth/drive.readonly"],
)
if upload_from_gcs:
log("Baixando arquivos através do GCS")

# Criar o serviço da API Google Drive e Google Sheets
drive_service = build("drive", "v3", credentials=credentials)
# Baixa planilha de OS
file_bytes_os = io.BytesIO(
get_upload_storage_blob(
dataset_id=constants.GTFS_DATASET_ID.value, filename="os"
).download_as_bytes()
)

# Baixa planilha de OS
file_link = os_control["Link da OS"]
file_bytes_os = download_xlsx(file_link=file_link, drive_service=drive_service)
# Baixa GTFS
file_bytes_gtfs = io.BytesIO(
get_upload_storage_blob(
dataset_id=constants.GTFS_DATASET_ID.value, filename="gtfs"
).download_as_bytes()
)

# Baixa GTFS
file_link = os_control["Link do GTFS"]
file_bytes_gtfs = download_file(file_link=file_link, drive_service=drive_service)
else:
log("Baixando arquivos através do Google Drive")

# Autenticar usando o arquivo de credenciais
credentials = service_account.Credentials.from_service_account_file(
filename=os.environ["GOOGLE_APPLICATION_CREDENTIALS"],
scopes=["https://www.googleapis.com/auth/drive.readonly"],
)

# Criar o serviço da API Google Drive e Google Sheets
drive_service = build("drive", "v3", credentials=credentials)

# Baixa planilha de OS
file_link = os_control["Link da OS"]
file_bytes_os = download_xlsx(file_link=file_link, drive_service=drive_service)

# Baixa GTFS
file_link = os_control["Link do GTFS"]
file_bytes_gtfs = download_file(file_link=file_link, drive_service=drive_service)

# Salva os nomes das planilhas
sheetnames = xl.load_workbook(file_bytes_os).sheetnames
Expand Down
Loading

0 comments on commit ca41313

Please sign in to comment.