Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migracao inicial #2

Open
wants to merge 68 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
82cca31
feat: Add gdal to pyproject
PedroSiqueira1 Sep 2, 2024
c815ddb
bugfix: Add gdal version to poetry
PedroSiqueira1 Sep 3, 2024
36f3458
fix: Add GDAL dependencies to Dockerfile
PedroSiqueira1 Sep 3, 2024
52c8ce3
fix: Trying curl in dockerfile
PedroSiqueira1 Sep 3, 2024
dad88a1
fix: Install libgdal in docker file
PedroSiqueira1 Sep 3, 2024
85082a7
fix: Add g++ to Docker
PedroSiqueira1 Sep 3, 2024
268ab13
fix: Add C envs
PedroSiqueira1 Sep 3, 2024
e53a650
fix: Back to python 3.9 to get gdal to work
PedroSiqueira1 Sep 3, 2024
5e82779
fix: Change base stages
PedroSiqueira1 Sep 3, 2024
5bc413d
fix: Add path to wheel
PedroSiqueira1 Sep 3, 2024
7400f53
fix: Update cd_staging to use 3.9
PedroSiqueira1 Sep 3, 2024
381c349
fix: Add flows to __init__
PedroSiqueira1 Sep 3, 2024
2fce9bc
fix: Change init.py to add flows
PedroSiqueira1 Sep 3, 2024
3e4e908
fix: Change flow imports
PedroSiqueira1 Sep 3, 2024
f705505
fix: Add ignore to flake8
PedroSiqueira1 Sep 3, 2024
cabccdd
fix: Add W503 to flake8 ignore
PedroSiqueira1 Sep 3, 2024
99977af
fix: Add version to docker file and remove environment from workflows
PedroSiqueira1 Sep 4, 2024
1b63e2a
Merge branch 'main' into staging/migracao-inicial
gabriel-milan Sep 4, 2024
b562446
fix: Change label name from rj-cor to cor to match prefect agent
PedroSiqueira1 Sep 5, 2024
87c81b8
fix: Add openpyxl to poetry
PedroSiqueira1 Sep 5, 2024
64013e1
Dummy commiy to readd flows
PedroSiqueira1 Sep 6, 2024
aead2e2
fix: repo structure, add state handlers
gabriel-milan Sep 6, 2024
81acd7d
chore: update prefeitura-rio version
gabriel-milan Sep 6, 2024
fda87e5
chore: remove flow
gabriel-milan Sep 6, 2024
3ec0c74
fix: flow imports
gabriel-milan Sep 6, 2024
c6ab5a6
fix: Add new connection to redis
PedroSiqueira1 Sep 9, 2024
db66764
fix: circular dependency
PedroSiqueira1 Sep 9, 2024
f85630e
Dummy commit
PedroSiqueira1 Sep 10, 2024
5bcc05a
Dummy commit to fix infisical
PedroSiqueira1 Sep 10, 2024
e084552
fix: Change Vault for Infisical Acess
PedroSiqueira1 Sep 10, 2024
3620cc4
fix: Fix infisical acess
PedroSiqueira1 Sep 10, 2024
5108e07
fix: Change token access
PedroSiqueira1 Sep 10, 2024
3cba2ac
fix: Access to infisical in redemet
PedroSiqueira1 Sep 10, 2024
c0647df
fix: Change infisical keys
PedroSiqueira1 Sep 10, 2024
31ca0e5
fix: Fix alertario_rio url
PedroSiqueira1 Sep 10, 2024
eb33a00
Feat: Add log to redemet to see download shape
PedroSiqueira1 Sep 10, 2024
15466f3
feat: Add log data to redemet download_data task
PedroSiqueira1 Sep 12, 2024
464f21d
Dummy commit
PedroSiqueira1 Sep 13, 2024
1d1223a
Dummy commit 2, to rerun jobs
PedroSiqueira1 Sep 13, 2024
9fda93d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 13, 2024
c6e6792
feat: add log
PedroSiqueira1 Sep 13, 2024
3abf5b7
fix: get secrets right way
PedroSiqueira1 Sep 13, 2024
8f1eec9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 13, 2024
7d7b988
feat: Add cor agent to websirene
PedroSiqueira1 Sep 16, 2024
ddbc863
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 16, 2024
129377e
fix: Add log AlertaRio
PedroSiqueira1 Sep 17, 2024
72da30c
Merge branch 'main' of https://github.com/prefeitura-rio/pipelines_rj…
PedroSiqueira1 Sep 17, 2024
e5f746f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 17, 2024
96791a7
fix: Change poetry
PedroSiqueira1 Sep 17, 2024
df5b60e
fix: add missing libs
PedroSiqueira1 Sep 17, 2024
c51ffd9
fix: Import changes meterologia
PedroSiqueira1 Sep 17, 2024
c8c0938
Merge branch 'main' of https://github.com/prefeitura-rio/pipelines_rj…
PedroSiqueira1 Sep 18, 2024
09eb69f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 18, 2024
4073829
fix: change log alertario
PedroSiqueira1 Sep 19, 2024
2464be7
fix: add lxml dependecy
PedroSiqueira1 Sep 19, 2024
42cd5a0
fix: Change redis access
PedroSiqueira1 Sep 19, 2024
a13ff54
Merge branch 'main' of https://github.com/prefeitura-rio/pipelines_rj…
PedroSiqueira1 Sep 19, 2024
5d4bfd9
Merge branch 'main' into staging/migracao-inicial
mergify[bot] Sep 19, 2024
3b43a75
fix: Uncomment get_on_redis task
PedroSiqueira1 Sep 19, 2024
08a8397
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 19, 2024
4980ba8
Dummy commit to update flow
PedroSiqueira1 Sep 20, 2024
9eded84
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2024
8bdaa8b
feat: add save_on_redis task
PedroSiqueira1 Sep 20, 2024
5046b1b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2024
c2a888e
Merge branch 'main' of https://github.com/prefeitura-rio/pipelines_rj…
PedroSiqueira1 Sep 23, 2024
3252ba1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 23, 2024
9f0c621
fix: change default materialize_to_datario to false
PedroSiqueira1 Sep 23, 2024
8d6a0a8
fix: change default schedule parameters
PedroSiqueira1 Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[flake8]
ignore = E203,F401,F403, W503
max-line-length = 100
6 changes: 6 additions & 0 deletions pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
Imports all flows for every project so we can register all of them.
"""
from pipelines.meteorologia import *
from pipelines.templates import *
5 changes: 2 additions & 3 deletions pipelines/meteorologia/meteorologia_inmet/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.meteorologia.meteorologia_inmet.schedules import hour_schedule
Expand All @@ -29,9 +30,7 @@

with Flow(
name="COR: Meteorologia - Meteorologia INMET",
code_owners=[
"paty",
],
state_handlers=[handler_inject_bd_credentials],
) as cor_meteorologia_meteorologia_inmet:
DATASET_ID = "clima_estacao_meteorologica"
TABLE_ID = "meteorologia_inmet"
Expand Down
6 changes: 3 additions & 3 deletions pipelines/meteorologia/meteorologia_inmet/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import pendulum
import requests
from prefect import task
from prefeitura_rio.pipelines_utils.infisical import get_secret

from pipelines.constants import constants
from pipelines.meteorologia.precipitacao_alertario.utils import parse_date_columns
from pipelines.utils.utils import get_vault_secret, log, to_partitions
from pipelines.utils.utils import log, to_partitions

# from pipelines.rj_cor.meteorologia.meteorologia_inmet.meteorologia_utils import converte_timezone

Expand Down Expand Up @@ -79,8 +80,7 @@ def download(data_inicio: str, data_fim: str) -> pd.DataFrame:
# no UTC, visto que ele só traria dados do novo dia e substituiria
# no arquivo da partição do dia atual no nosso timezone

dicionario = get_vault_secret("inmet_api")
token = dicionario["data"]["token"]
token = get_secret("INMET_API")["INMET_API"]

raw = []
for id_estacao in estacoes_unicas:
Expand Down
7 changes: 3 additions & 4 deletions pipelines/meteorologia/meteorologia_redemet/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.meteorologia.meteorologia_redemet.schedules import (
Expand All @@ -35,9 +36,7 @@

with Flow(
name="COR: Meteorologia - Meteorologia REDEMET",
code_owners=[
"paty",
],
state_handlers=[handler_inject_bd_credentials],
) as cor_meteorologia_meteorologia_redemet:
DUMP_MODE = Parameter("dump_mode", default="append", required=True)
DATASET_ID = Parameter("dataset_id", default="clima_estacao_meteorologica", required=True)
Expand All @@ -49,7 +48,7 @@

# Materialization parameters
MATERIALIZE_AFTER_DUMP = Parameter("materialize_after_dump", default=True, required=False)
MATERIALIZE_TO_DATARIO = Parameter("materialize_to_datario", default=True, required=False)
MATERIALIZE_TO_DATARIO = Parameter("materialize_to_datario", default=False, required=False)
MATERIALIZATION_MODE = Parameter("mode", default="prod", required=False)

# Dump to GCS after? Should only dump to GCS if materializing to datario
Expand Down
2 changes: 1 addition & 1 deletion pipelines/meteorologia/meteorologia_redemet/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
parameter_defaults={
"materialize_after_dump": True,
"mode": "prod",
"materialize_to_datario": True,
"materialize_to_datario": False,
"dump_to_gcs": False,
# "dump_mode": "overwrite",
# "dataset_id": "clima_estacao_meteorologica",
Expand Down
20 changes: 10 additions & 10 deletions pipelines/meteorologia/meteorologia_redemet/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@
from prefect import task
from prefect.engine.signals import ENDRUN
from prefect.engine.state import Failed
from prefeitura_rio.pipelines_utils.infisical import get_secret
from unidecode import unidecode

from pipelines.constants import constants
from pipelines.utils.utils import (
get_vault_secret,
log,
parse_date_columns,
to_partitions,
)
from pipelines.utils.utils import log, parse_date_columns, to_partitions


@task(nout=3)
Expand Down Expand Up @@ -62,8 +58,7 @@ def download_data(first_date: str, last_date: str) -> pd.DataFrame:
"SBSC",
]

redemet_token = get_vault_secret("redemet-token")
redemet_token = redemet_token["data"]["token"]
redemet_token = get_secret("REDEMET-TOKEN")["REDEMET-TOKEN"]

# Converte datas em int para cálculo de faixas.
first_date_int = int(first_date.replace("-", ""))
Expand All @@ -80,6 +75,7 @@ def download_data(first_date: str, last_date: str) -> pd.DataFrame:
res = requests.get(url)
if res.status_code != 200:
log(f"Problema no id: {id_estacao}, {res.status_code}")
log(f"Data: {data}, Hora: {hora}")
continue
res_data = json.loads(res.text)
if res_data["status"] is not True:
Expand All @@ -90,12 +86,17 @@ def download_data(first_date: str, last_date: str) -> pd.DataFrame:
continue
raw.append(res_data)

log(f"Raw data size: {len(raw)}")

# Extrai objetos de dataframe
raw = [res_data["data"] for res_data in raw]

# converte para dataframe
dataframe = pd.DataFrame(raw)

# Log dataframe size
log(f"Daframe shape: {dataframe.shape}")

return dataframe


Expand Down Expand Up @@ -205,8 +206,7 @@ def download_stations_data() -> pd.DataFrame:
Download station information
"""

redemet_token = get_vault_secret("redemet-token")
redemet_token = redemet_token["data"]["token"]
redemet_token = get_secret("REDEMET-TOKEN")["REDEMET-TOKEN"]
base_url = f"https://api-redemet.decea.mil.br/aerodromos/?api_key={redemet_token}" # noqa
url = f"{base_url}&pais=Brasil"
res = requests.get(url)
Expand Down
6 changes: 2 additions & 4 deletions pipelines/meteorologia/precipitacao_alertario/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.meteorologia.precipitacao_alertario.constants import (
Expand Down Expand Up @@ -42,10 +43,7 @@

with Flow(
name="COR: Meteorologia - Precipitacao e Meteorologia ALERTARIO",
code_owners=[
"paty",
],
# skip_if_running=True,
state_handlers=[handler_inject_bd_credentials],
) as cor_meteorologia_precipitacao_alertario:
DATASET_ID_PLUVIOMETRIC = alertario_constants.DATASET_ID_PLUVIOMETRIC.value
TABLE_ID_PLUVIOMETRIC = alertario_constants.TABLE_ID_PLUVIOMETRIC.value
Expand Down
2 changes: 1 addition & 1 deletion pipelines/meteorologia/precipitacao_alertario/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"materialize_after_dump_old_api": True,
"materialize_to_datario_old_api": True,
"materialize_after_dump": True,
"materialize_to_datario": True,
"materialize_to_datario": False,
"mode": "prod",
"dump_to_gcs": False,
},
Expand Down
5 changes: 2 additions & 3 deletions pipelines/meteorologia/precipitacao_alertario/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import requests
from bs4 import BeautifulSoup
from prefect import task
from prefeitura_rio.pipelines_utils.infisical import get_secret

from pipelines.constants import constants
from pipelines.meteorologia.precipitacao_alertario.utils import (
Expand All @@ -23,7 +24,6 @@
build_redis_key,
compare_dates_between_tables_redis,
get_redis_output,
get_vault_secret,
log,
parse_date_columns,
save_str_on_redis,
Expand All @@ -42,8 +42,7 @@ def download_data() -> pd.DataFrame:
Request data from API and return each data in a different dataframe.
"""

dicionario = get_vault_secret("alertario_api")
url = dicionario["data"]["url"]
url = get_secret("ALERTARIO_API")["ALERTARIO_API"]

try:
response = requests.get(url)
Expand Down
7 changes: 3 additions & 4 deletions pipelines/meteorologia/precipitacao_cemaden/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.meteorologia.precipitacao_cemaden.constants import (
Expand Down Expand Up @@ -38,17 +39,15 @@

with Flow(
name="COR: Meteorologia - Precipitacao CEMADEN",
code_owners=[
"paty",
],
state_handlers=[handler_inject_bd_credentials],
# skip_if_running=True,
) as cor_meteorologia_precipitacao_cemaden:
DUMP_MODE = Parameter("dump_mode", default="append", required=True)
DATASET_ID = Parameter("dataset_id", default="clima_pluviometro", required=True)
TABLE_ID = Parameter("table_id", default="taxa_precipitacao_cemaden", required=True)
# Materialization parameters
MATERIALIZE_AFTER_DUMP = Parameter("materialize_after_dump", default=True, required=False)
MATERIALIZE_TO_DATARIO = Parameter("materialize_to_datario", default=True, required=False)
MATERIALIZE_TO_DATARIO = Parameter("materialize_to_datario", default=False, required=False)
MATERIALIZATION_MODE = Parameter("mode", default="prod", required=False)
TRIGGER_RAIN_DASHBOARD_UPDATE = Parameter(
"trigger_rain_dashboard_update", default=False, required=False
Expand Down
2 changes: 1 addition & 1 deletion pipelines/meteorologia/precipitacao_cemaden/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# "trigger_rain_dashboard_update": True,
"materialize_after_dump": True,
"mode": "prod",
"materialize_to_datario": True,
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "append",
"dataset_id": "clima_pluviometro",
Expand Down
Empty file.
7 changes: 3 additions & 4 deletions pipelines/meteorologia/precipitacao_inea/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.meteorologia.precipitacao_inea.schedules import minute_schedule
Expand All @@ -34,9 +35,7 @@

with Flow(
name="COR: Meteorologia - Precipitacao e Fluviometria INEA",
code_owners=[
"paty",
],
state_handlers=[handler_inject_bd_credentials],
# skip_if_running=True,
) as cor_meteorologia_precipitacao_inea:
DUMP_MODE = Parameter("dump_mode", default="append", required=True)
Expand All @@ -55,7 +54,7 @@

# Materialization parameters
MATERIALIZE_AFTER_DUMP = Parameter("materialize_after_dump", default=True, required=False)
MATERIALIZE_TO_DATARIO = Parameter("materialize_to_datario", default=True, required=False)
MATERIALIZE_TO_DATARIO = Parameter("materialize_to_datario", default=False, required=False)
MATERIALIZATION_MODE = Parameter("mode", default="prod", required=False)

# Dump to GCS after? Should only dump to GCS if materializing to datario
Expand Down
2 changes: 1 addition & 1 deletion pipelines/meteorologia/precipitacao_inea/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# "trigger_rain_dashboard_update": True,
"materialize_after_dump": True,
"mode": "prod",
"materialize_to_datario": True,
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "append",
"dataset_id_pluviometric": "clima_pluviometro",
Expand Down
8 changes: 4 additions & 4 deletions pipelines/meteorologia/precipitacao_websirene/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.meteorologia.precipitacao_websirene.schedules import MINUTE_SCHEDULE
Expand All @@ -24,9 +25,7 @@

with Flow(
"COR: Meteorologia - Precipitacao WEBSIRENE",
code_owners=[
"paty",
],
state_handlers=[handler_inject_bd_credentials],
) as cor_meteorologia_precipitacao_websirene:
DATASET_ID = "clima_pluviometro"
TABLE_ID = "taxa_precipitacao_websirene"
Expand Down Expand Up @@ -85,6 +84,7 @@
# para rodar na cloud
cor_meteorologia_precipitacao_websirene.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_precipitacao_websirene.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_COR_AGENT_LABEL.value],
)
cor_meteorologia_precipitacao_websirene.schedule = MINUTE_SCHEDULE
2 changes: 1 addition & 1 deletion pipelines/meteorologia/radar/mendanha/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_utils.custom import Flow # pylint: disable=E0611, E0401
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
Expand Down
1 change: 1 addition & 0 deletions pipelines/meteorologia/satelite/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prefect import Parameter, case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefeitura_rio.pipelines_utils.custom import Flow # pylint: disable=E0611, E0401
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
Expand Down
Empty file.
Empty file.
Loading
Loading