Skip to content

Commit

Permalink
Merge branch 'master' into pre-commit-ci-update-config
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Jan 26, 2024
2 parents cb13f77 + 5f6589a commit 0dd55aa
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 62 deletions.
120 changes: 68 additions & 52 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
# BILHETAGEM INTEGRAÇÃO - CAPTURA A CADA MINUTO #

bilhetagem_integracao_captura = deepcopy(default_capture_flow)
bilhetagem_integracao_captura.name = "SMTR: Bilhetagem Integração - Captura"
bilhetagem_integracao_captura.name = "SMTR: Bilhetagem Integração - Captura (subflow)"
bilhetagem_integracao_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_integracao_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
Expand All @@ -79,7 +79,6 @@
| constants.BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS.value,
)

bilhetagem_integracao_captura.schedule = every_minute

# BILHETAGEM GPS - CAPTURA A CADA 5 MINUTOS #

Expand Down Expand Up @@ -295,23 +294,6 @@
raise_final_state=True,
)

# Recaptura Integração

run_recaptura_integracao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS.value,
upstream_tasks=[wait_recaptura_transacao_true],
)

wait_recaptura_integracao_true = wait_for_flow_run(
run_recaptura_integracao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

# Captura Auxiliar

runs_captura = create_flow_run.map(
Expand All @@ -321,7 +303,7 @@
labels=unmapped(LABELS),
)

runs_captura.set_upstream(wait_recaptura_integracao_true)
runs_captura.set_upstream(wait_recaptura_transacao_true)

wait_captura_true = wait_for_flow_run.map(
runs_captura,
Expand Down Expand Up @@ -353,9 +335,8 @@
wait_captura_false,
wait_recaptura_auxiliar_false,
wait_recaptura_transacao_false,
wait_recaptura_integracao_false,
) = task(
lambda: [None, None, None, None], name="assign_none_to_capture_runs", nout=4
lambda: [None, None, None], name="assign_none_to_capture_runs", nout=3
)()

wait_captura = merge(wait_captura_false, wait_captura_true)
Expand All @@ -365,9 +346,6 @@
wait_recaptura_transacao = merge(
wait_recaptura_transacao_false, wait_recaptura_transacao_true
)
wait_recaptura_integracao = merge(
wait_recaptura_integracao_false, wait_recaptura_integracao_true
)

with case(materialize, True):
materialize_timestamp = get_current_timestamp(
Expand All @@ -382,7 +360,6 @@
wait_captura,
wait_recaptura_auxiliar,
wait_recaptura_transacao,
wait_recaptura_integracao,
],
parameters={
"timestamp": materialize_timestamp,
Expand All @@ -396,31 +373,12 @@
raise_final_state=True,
)

run_materializacao_integracao = create_flow_run(
flow_name=bilhetagem_materializacao_integracao.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[
wait_materializacao_transacao,
],
parameters={
"timestamp": materialize_timestamp,
},
)

wait_materializacao_integracao = wait_for_flow_run(
run_materializacao_integracao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

run_materializacao_gps_validador = create_flow_run(
flow_name=bilhetagem_materializacao_gps_validador.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[
wait_materializacao_integracao,
wait_materializacao_transacao,
],
parameters={
"timestamp": materialize_timestamp,
Expand Down Expand Up @@ -478,6 +436,20 @@
raise_final_state=unmapped(True),
)

runs_captura_integracao = create_flow_run(
flow_name=unmapped(bilhetagem_integracao_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
labels=unmapped(LABELS),
upstream_tasks=[wait_captura],
)

wait_captura_integracao = wait_for_flow_run(
runs_captura_integracao,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Recaptura #

runs_recaptura = create_flow_run.map(
Expand All @@ -496,23 +468,48 @@
raise_final_state=unmapped(True),
)

# Recaptura Integração

run_recaptura_integracao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS.value,
upstream_tasks=[wait_recaptura_true, wait_captura_integracao],
)

wait_recaptura_integracao_true = wait_for_flow_run(
run_recaptura_integracao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

with case(capture, False):
wait_recaptura_false = task(lambda: None, name="assign_none_to_recapture")()
wait_recaptura_false, wait_recaptura_integracao_false = task(
lambda: [None, None], name="assign_none_to_recapture", nout=2
)()

wait_recaptura = merge(wait_recaptura_true, wait_recaptura_false)
wait_recaptura_integracao = merge(
wait_recaptura_integracao_true, wait_recaptura_integracao_false
)

# Materialização #

with case(materialize, True):
materialize_timestamp = get_current_timestamp(
timestamp=timestamp,
return_str=True,
)

run_materializacao = create_flow_run(
flow_name=bilhetagem_materializacao_ordem_pagamento.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[wait_recaptura],
upstream_tasks=[wait_recaptura, wait_recaptura_integracao],
parameters={
"timestamp": get_current_timestamp(
timestamp=timestamp, return_str=True
),
"timestamp": materialize_timestamp,
},
)

Expand All @@ -523,8 +520,27 @@
raise_final_state=True,
)

run_materializacao_integracao = create_flow_run(
flow_name=bilhetagem_materializacao_integracao.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[
wait_materializacao,
],
parameters={
"timestamp": materialize_timestamp,
},
)

wait_materializacao_integracao = wait_for_flow_run(
run_materializacao_integracao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

bilhetagem_ordem_pagamento_captura_tratamento.set_reference_tasks(
[wait_materializacao, wait_recaptura]
[wait_materializacao_integracao, wait_recaptura]
)

bilhetagem_ordem_pagamento_captura_tratamento.storage = GCS(
Expand Down
20 changes: 10 additions & 10 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class constants(Enum): # pylint: disable=c0103
""",
},
"primary_key": ["id"],
"interval_minutes": 1,
"interval_minutes": 1440,
}

BILHETAGEM_TRACKING_CAPTURE_PARAMS = {
Expand Down Expand Up @@ -455,32 +455,32 @@ class constants(Enum): # pylint: disable=c0103
},
]

BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"table_id": BILHETAGEM_TRANSACAO_CAPTURE_PARAMS["table_id"],
"table_id": "integracao",
"upstream": True,
"dbt_vars": {
"date_range": {
"table_run_datetime_column_name": "datetime_transacao",
"delay_hours": 1,
"table_run_datetime_column_name": "datetime_captura",
"delay_hours": 0,
},
"version": {},
},
"exclude": "+operadoras +consorcios",
}

BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = {
BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"table_id": BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS["table_id"],
"table_id": "passageiros_hora",
"upstream": True,
"dbt_vars": {
"date_range": {
"table_run_datetime_column_name": "datetime_captura",
"table_run_datetime_column_name": "datetime_transacao",
"delay_hours": 1,
"table_alias": "integracao",
},
"version": {},
},
"exclude": "+diretorio_operadoras +diretorio_consorcios",
"exclude": "integracao matriz_integracao",
}

BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = {
Expand Down

0 comments on commit 0dd55aa

Please sign in to comment.