Skip to content

Commit

Permalink
Cria função handler_skip_if_running_tolerant (#222)
Browse files Browse the repository at this point in the history
* add handler_skip_if_running_tolerant

* test

* add changelog

* fix handler

* remove alteracoes para teste

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
pixuimpou and mergify[bot] committed Sep 13, 2024
1 parent a11dfbf commit c35e9b4
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# Changelog - br_rj_riodejaneiro_bilhetagem

## [1.4.4] - 2024-09-12
## [1.4.5] - 2024-09-13

### Alterado
- Cria tolerância para pular a run no State Handler do flow `bilhetagem_tracking_captura` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/222)

## [1.4.4] - 2024-09-12
s
### Corrigido

`bilhetagem_tracking_captura`:
Expand Down
3 changes: 2 additions & 1 deletion pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
)
from pipelines.treatment.templates.tasks import run_data_quality_checks
from pipelines.utils.dataplex import DataQualityCheckArgs
from pipelines.utils.prefect import handler_skip_if_running_tolerant

# BILHETAGEM TRANSAÇÃO - CAPTURA A CADA MINUTO #

Expand Down Expand Up @@ -144,7 +145,7 @@
bilhetagem_tracking_captura.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
handler_skip_if_running,
handler_skip_if_running_tolerant(tolerance_minutes=3),
]


Expand Down
54 changes: 54 additions & 0 deletions pipelines/utils/prefect.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# -*- coding: utf-8 -*-
"""Prefect functions"""
import inspect
import time

# import json
from typing import Any, Callable, Dict, Type, Union

import prefect
from prefect import unmapped
from prefect.backend.flow_run import FlowRunView, FlowView, watch_flow_run
from prefect.client import Client
from prefect.engine.state import Skipped, State

# from prefect.engine.signals import PrefectStateSignal, signal_from_state
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
Expand Down Expand Up @@ -332,3 +335,54 @@ def run_flow_mapped(

class FailedSubFlow(Exception):
"""Erro para ser usado quando um subflow falha"""


def handler_skip_if_running_tolerant(tolerance_minutes: int):
"""
State handler that will skip a flow run if another instance of the flow is already running.
Adapted from Prefect Discourse:
https://tinyurl.com/4hn5uz2w
"""
if tolerance_minutes < 0:
tolerance_minutes = 0

def handler(obj, old_state: State, new_state: State) -> State:
if new_state.is_running():
logger = prefect.context.get("logger")
for i in range(tolerance_minutes + 1):
client = Client()
query = """
query($flow_id: uuid) {
flow_run(
where: {
_and: [
{state: {_eq: "Running"}},
{flow_id: {_eq: $flow_id}}
]
}
) {
id
}
}
"""
# pylint: disable=no-member
response = client.graphql(
query=query,
variables=dict(flow_id=prefect.context.flow_id),
)
active_flow_runs = response["data"]["flow_run"]
if active_flow_runs and i < tolerance_minutes:
logger.info(f"Attempt {i}")
time.sleep(60)
else:
break
if active_flow_runs:
message = (
"Skipping this flow run since there are already some flow runs in progress"
)
logger.info(message)
return Skipped(message)
return new_state

return handler

0 comments on commit c35e9b4

Please sign in to comment.