Skip to content

Commit

Permalink
Merge pull request #165 from prefeitura-rio/feat/safe-upload-to-datalake
Browse files Browse the repository at this point in the history
Feat/safe upload to datalake
  • Loading branch information
TanookiVerde committed Jul 26, 2024
2 parents bb54dd8 + 959f448 commit eb74622
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 23 deletions.
1 change: 1 addition & 0 deletions api/app/pydantic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class RawDataListModel(BaseModel):

class BulkInsertOutputModel(BaseModel):
count: int
uploaded_to_datalake: Optional[bool] = False


class ConditionListModel(BaseModel):
Expand Down
56 changes: 33 additions & 23 deletions api/app/routers/entities_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Annotated, Literal
from fastapi import APIRouter, Depends
from fastapi.responses import HTMLResponse
from loguru import logger
from tortoise.exceptions import ValidationError

from app.pydantic_models import RawDataListModel, BulkInsertOutputModel, RawDataModel
Expand Down Expand Up @@ -75,26 +76,6 @@ async def create_raw_data(
records = raw_data.dict().get("data_list")
data_source = await DataSource.get(cnes=raw_data.cnes)

# ====================
# SEND TO DATALAKE
# ====================
formatter = get_formatter(
system=data_source.system.value,
entity=entity_name
)

if upload_to_datalake and formatter:
uploader = DatalakeUploader(
dump_mode="append",
force_unique_file_name=True,
)

for config, dataframe in apply_formatter(records, formatter).items():
uploader.upload(
dataframe=dataframe,
**convert_model_config_to_dict(config)
)

# ====================
# SAVE IN HCI DATABASE
# ====================
Expand All @@ -115,14 +96,43 @@ async def create_raw_data(
)
except ValidationError as e:
return HTMLResponse(status_code=400, content=str(e))

try:
new_records = await Entity.bulk_create(records_to_create, ignore_conflicts=True)
return {
"count": len(new_records),
}
except asyncpg.exceptions.DeadlockDetectedError as e:
return HTMLResponse(status_code=400, content=str(e))

# ====================
# SEND TO DATALAKE
# ====================
uploaded_to_datalake = False

formatter = get_formatter(
system=data_source.system.value,
entity=entity_name
)

try:
if upload_to_datalake and formatter:
uploader = DatalakeUploader(
dump_mode="append",
force_unique_file_name=True,
)

for config, dataframe in apply_formatter(records, formatter).items():
uploader.upload(
dataframe=dataframe,
**convert_model_config_to_dict(config)
)
uploaded_to_datalake = True
except Exception as e:
logger.error(f"Error uploading to datalake: {e}")
finally:
return BulkInsertOutputModel(
count=len(new_records),
uploaded_to_datalake=uploaded_to_datalake,
)


@router.post("/{entity_name}/setAsInvalid", status_code=200)
async def set_as_invalid_flag_records(
Expand Down

0 comments on commit eb74622

Please sign in to comment.