Skip to content

Commit

Permalink
merge with main that has update radar pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
patriciacatandi committed Sep 20, 2024
2 parents e56cd8b + 724dbe1 commit fd71fe1
Show file tree
Hide file tree
Showing 19 changed files with 732 additions and 742 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Setup Python version
uses: actions/setup-python@v2
with:
python-version: "3.10"
python-version: "3.9"

- name: Setup Google Cloud credentials
uses: google-github-actions/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/code-tree-analysis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- name: Setup Python version
uses: actions/setup-python@v2
with:
python-version: "3.10"
python-version: "3.9"

- name: Install Python dependencies for deploying
run: |-
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e
with:
python-version: "3.10"
python-version: "3.9"

- name: Set up Poetry and upgrade pip
run: |
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repos:
rev: 22.12.0
hooks:
- id: black
language_version: python3.10
language_version: python3.9

- repo: https://github.com/PyCQA/isort
rev: 5.12.0
Expand Down
94 changes: 59 additions & 35 deletions pipelines/meteorologia/radar/mendanha/flows.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
# -*- coding: utf-8 -*-
# flake8: noqa: E501
# pylint: disable=C0103
# pylint: disable=C0103, C0301
"""
Flows for setting rain dashboard using radar data.
"""
from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS

# sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../prefeitura-rio")))
# # sys.path.insert(0, '/home/patricia/Documentos/escritorio_dados/prefeitura-rio/prefeitura-rio')
# print("sys.path:", sys.path)
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.custom import Flow # pylint: disable=E0611, E0401
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.meteorologia.radar.mendanha.constants import (
constants as radar_constants, # pylint: disable=E0611, E0401
)

# from pipelines.tasks import task_get_redis_client
# from pipelines.meteorologia.radar.mendanha.schedules import TIME_SCHEDULE
from pipelines.meteorologia.radar.mendanha.constants import constants as radar_constants
from pipelines.meteorologia.radar.mendanha.tasks import ( # prefix_to_restore,; save_data,; create_visualization_with_background,; get_storage_destination,; upload_file_to_storage,; prefix_to_restore,; save_data,
from pipelines.meteorologia.radar.mendanha.schedules import ( # pylint: disable=E0611, E0401
TIME_SCHEDULE,
)
from pipelines.meteorologia.radar.mendanha.tasks import ( # pylint: disable=E0611, E0401
access_api,
add_new_image,
base64_to_bytes,
Expand All @@ -31,28 +33,28 @@
get_colorbar_title,
get_filenames_storage,
get_radar_parameters,
get_storage_destination,
img_to_base64,
remap_data,
rename_keys_redis,
save_images_to_local,
save_img_on_redis,
send_zip_images_api,
upload_file_to_storage,
)
from pipelines.utils_rj_cor import build_redis_key

# # Adiciona o diretório `/algum/diretorio/` ao sys.path
# import os, sys # noqa


# from prefeitura_rio.pipelines_utils.tasks import create_table_and_upload_to_gcs

# from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from pipelines.tasks import ( # pylint: disable=E0611, E0401
task_build_redis_hash,
task_get_redis_client,
task_get_redis_output,
task_save_on_redis,
)

# create_visualization_with_background, prefix_to_restore, save_data,
# from pipelines.utils_rj_cor import build_redis_hash # pylint: disable=E0611, E0401

# from pipelines.tasks import (
# get_on_redis,
# save_on_redis,
# )

# from pipelines.utils.tasks import create_table_and_upload_to_gcs

Expand All @@ -76,8 +78,24 @@
# BASE_PATH = "pipelines/rj_cor/meteorologia/radar/precipitacao/"
BUCKET_NAME = "rj-escritorio-scp"

# redis_data_key = Parameter("redis_data_key", default="data_last_15min_rain")
# redis_update_key = Parameter(
# "redis_update_key", default="data_last_15min_rain_update"
# )
# redis_host = Parameter("redis_host", default="redis.redis.svc.cluster.local")
# redis_port = Parameter("redis_port", default=6379)
# redis_db = Parameter("redis_db", default=1)

redis_client = task_get_redis_client(infisical_secrets_path="/redis")
redis_hash = task_build_redis_hash(DATASET_ID, TABLE_ID, name="images", mode=MODE)
redis_hash_processed = task_build_redis_hash(
DATASET_ID, TABLE_ID, name="processed_images", mode=MODE
)
files_saved_redis = task_get_redis_output(redis_client, redis_key=redis_hash_processed)
# files_saved_redis = get_on_redis(DATASET_ID, TABLE_ID, mode=MODE)
files_on_storage_list = get_filenames_storage(BUCKET_NAME, files_saved_redis=[])
files_on_storage_list, files_to_save_redis = get_filenames_storage(
BUCKET_NAME, files_saved_redis=files_saved_redis
)

radar_files = download_files_storage(
bucket_name=BUCKET_NAME,
Expand All @@ -89,7 +107,7 @@
radar_2d = remap_data(combined_radar, RADAR_PRODUCT_LIST, grid_shape, grid_limits)

# Create visualizations
formatted_time = get_and_format_time(radar_files)
formatted_time, filename_time = get_and_format_time(radar_files)
cbar_title = get_colorbar_title(RADAR_PRODUCT_LIST[0])
fig = create_visualization_no_background(
radar_2d, radar_product=RADAR_PRODUCT_LIST[0], cbar_title=cbar_title, title=formatted_time
Expand All @@ -99,22 +117,30 @@
img_bytes = base64_to_bytes(img_base64)

# update the name of images that are already on redis and save them as png
redis_hash = build_redis_key(DATASET_ID, TABLE_ID, name="images", mode=MODE)
img_base64_dict = rename_keys_redis(redis_hash, img_bytes)
all_img_base64_dict = add_new_image(img_base64_dict, img_bytes)
saved_images_path = save_images_to_local(all_img_base64_dict, folder="images")

save_img_on_redis(
redis_hash, "radar_020.png", img_bytes
redis_hash, "radar_020.png", img_bytes, saved_images_path
) # esperar baixar imagens que já estão no redis
save_img_on_redis.set_upstream(saved_images_path)
# save_img_on_redis.set_upstream(saved_images_path)

zip_filename = compress_to_zip("/images.zip", saved_images_path)

api = access_api()
send_zip_images_api(api, "uploadfile", zip_filename)
access_api.set_upstream(saved_images_path)
response = send_zip_images_api(api, "uploadfile", zip_filename)

saved_last_img_path = save_images_to_local({filename_time: img_bytes}, folder="last_image")
destination_blob_name, source_file_name = get_storage_destination(
filename_time, saved_last_img_path
)
upload_file_to_storage(
project="datario",
bucket_name="datario-public",
destination_blob_name=destination_blob_name,
source_file_name=source_file_name,
)
# fig_with_backgroud = create_visualization_with_background(
# radar_2d, radar_product=RADAR_PRODUCT_LIST[0], cbar_title=cbar_title, title=formatted_time
# )
Expand All @@ -123,7 +149,6 @@
# saved_with_background_img_path = save_images_to_local(
# {formatted_time: img_bytes_with_backgroud}
# )

# destination_blob_name, source_file_name = get_storage_destination(
# formatted_time, saved_with_background_img_path
# )
Expand All @@ -134,15 +159,14 @@
# source_file_name=source_file_name,
# )

# # Save new filenames on redis
# save_last_update_redis = save_on_redis(
# DATASET_ID,
# TABLE_ID,
# MODE,
# files_on_storage_list,
# keep_last=3,
# # wait=upload_table,
# )
# Save new filenames on redis
save_last_update_redis = task_save_on_redis(
redis_client=redis_client,
values=files_to_save_redis,
redis_key=redis_hash_processed,
keep_last=30,
wait=response,
)
# save_last_update_redis.set_upstream(upload_table)


Expand All @@ -156,4 +180,4 @@
memory_limit="3Gi",
)

# cor_meteorologia_refletividade_radar_flow.schedule = TIME_SCHEDULE
cor_meteorologia_refletividade_radar_flow.schedule = TIME_SCHEDULE
2 changes: 1 addition & 1 deletion pipelines/meteorologia/radar/mendanha/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
TIME_SCHEDULE = Schedule(
clocks=[
IntervalClock(
interval=timedelta(minutes=2),
interval=timedelta(minutes=5),
start_date=datetime(2021, 1, 1, 0, 0, 30),
labels=[
constants.RJ_COR_AGENT_LABEL.value,
Expand Down
Loading

0 comments on commit fd71fe1

Please sign in to comment.