Skip to content

Commit

Permalink
Criação do subflow de recaptura de GPS SPPO Realocação (#668)
Browse files Browse the repository at this point in the history
* criação do subflow de recaptura de GPS SPPO Realocação

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* corrige posicionamento do subflow e table_ids

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* corrige task da recaptura de gps sppo realocação

* corrige argumentos das tasks de recaptura realocacao

* corrige o intervalo do query_logs para 10 minutos

* adiciona parametro recapture_window_days no flow de recaptura de realocação

* corrige chamada do parametro recapture_window_days no query_logs

* remove duplcação da primeira linha

* ajusta arredondamento do timestamp na recaptura

* altera line ending

* Corrige sequência de tasks no flow recaptura

* Atualiza changelog

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* ajusta passagem do parametro timestamp na chamada do subflow

* corrige pattern de parse_timestamp_to_str

* atualiza changelog

* altera project para prod

* corrige comentario

* altera project name da materialização

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Rafael <[email protected]>
Co-authored-by: Rodrigo Cunha <[email protected]>
  • Loading branch information
5 people committed Apr 26, 2024
1 parent 987b963 commit fe6b2d5
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 3 deletions.
15 changes: 15 additions & 0 deletions pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Changelog - br_rj_riodejaneiro_onibus_gps

## [1.0.0] - 2024-04-26

### Adicionado

- Adiciona flow `recaptura_realocacao_sppo` (https://github.com/prefeitura-rio/pipelines/pull/668)

### Alterado

- Altera flow `recaptura`, incluindo acionamento do `recaptura_realocacao_sppo` (https://github.com/prefeitura-rio/pipelines/pull/668)

### Corrigido

- Corrigido parâmetro `timestamp` do flow `realocacao_sppo` (https://github.com/prefeitura-rio/pipelines/pull/668)
107 changes: 104 additions & 3 deletions pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
get_materialization_date_range,
# get_local_dbt_client,
get_raw,
get_rounded_timestamp,
parse_timestamp_to_string,
query_logs,
save_raw_local,
Expand Down Expand Up @@ -79,7 +80,7 @@
rebuild = Parameter("rebuild", False)

# SETUP
timestamp = get_current_timestamp()
timestamp = get_rounded_timestamp(interval_minutes=10)

rename_flow_run = rename_current_flow_run_now_time(
prefix=realocacao_sppo.name + ": ", now_time=timestamp
Expand Down Expand Up @@ -282,20 +283,120 @@
)
captura_sppo_v2.schedule = every_minute

with Flow(
"SMTR: GPS SPPO Realocação - Recaptura (subflow)",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as recaptura_realocacao_sppo:
timestamp = Parameter("timestamp", default=None)
recapture_window_days = Parameter("recapture_window_days", default=1)

# SETUP #
LABELS = get_current_flow_labels()

# Consulta de logs para verificar erros
errors, timestamps, previous_errors = query_logs(
dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value,
table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value,
datetime_filter=get_rounded_timestamp(timestamp=timestamp, interval_minutes=10),
interval_minutes=10,
recapture_window_days=recapture_window_days,
)

rename_flow_run = rename_current_flow_run_now_time(
prefix=recaptura_realocacao_sppo.name + ": ",
now_time=get_now_time(),
wait=timestamps,
)

# Em caso de erros, executa a recaptura
with case(errors, True):
# SETUP #
partitions = create_date_hour_partition.map(timestamps)
filename = parse_timestamp_to_string.map(timestamps)

filepath = create_local_partition_path.map(
dataset_id=unmapped(constants.GPS_SPPO_RAW_DATASET_ID.value),
table_id=unmapped(constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value),
filename=filename,
partitions=partitions,
)

url = create_api_url_onibus_realocacao.map(timestamp=timestamps)

# EXTRACT #
raw_status = get_raw.map(url)

raw_filepath = save_raw_local.map(status=raw_status, file_path=filepath)

# CLEAN #
treated_status = pre_treatment_br_rj_riodejaneiro_onibus_realocacao.map(
status=raw_status, timestamp=timestamps
)

treated_filepath = save_treated_local.map(
status=treated_status, file_path=filepath
)

# LOAD #
error = bq_upload.map(
dataset_id=unmapped(constants.GPS_SPPO_RAW_DATASET_ID.value),
table_id=unmapped(constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value),
filepath=treated_filepath,
raw_filepath=raw_filepath,
partitions=partitions,
status=treated_status,
)

upload_logs_to_bq.map(
dataset_id=unmapped(constants.GPS_SPPO_RAW_DATASET_ID.value),
parent_table_id=unmapped(constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value),
error=error,
previous_error=previous_errors,
timestamp=timestamps,
recapture=unmapped(True),
)

recaptura_realocacao_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
recaptura_realocacao_sppo.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

with Flow(
"SMTR: GPS SPPO - Tratamento", code_owners=["caio", "fernanda", "boris", "rodrigo"]
) as recaptura:
version = Parameter("version", default=2)
datetime_filter = Parameter("datetime_filter", default=None)
datetime_filter_gps = Parameter("datetime_filter_gps", default=None)
materialize = Parameter("materialize", default=True)
# SETUP #
LABELS = get_current_flow_labels()

rounded_timestamp = get_rounded_timestamp(interval_minutes=60)
rounded_timestamp_str = parse_timestamp_to_string(
timestamp=rounded_timestamp, pattern="%Y-%m-%d %H:%M:%S"
)

# roda o subflow de recaptura da realocação
run_recaptura_realocacao_sppo = create_flow_run(
flow_name=recaptura_realocacao_sppo.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
run_name=recaptura_realocacao_sppo.name,
parameters={"timestamp": rounded_timestamp_str},
)

wait_recaptura_realocacao_sppo = wait_for_flow_run(
run_recaptura_realocacao_sppo,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

errors, timestamps, previous_errors = query_logs(
dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value,
table_id=constants.GPS_SPPO_RAW_TABLE_ID.value,
datetime_filter=datetime_filter,
datetime_filter=datetime_filter_gps,
upstream_tasks=[wait_recaptura_realocacao_sppo],
)

rename_flow_run = rename_current_flow_run_now_time(
Expand Down

0 comments on commit fe6b2d5

Please sign in to comment.