diff --git a/api/app/pydantic_models.py b/api/app/pydantic_models.py index 3e76d23..3a8efdd 100644 --- a/api/app/pydantic_models.py +++ b/api/app/pydantic_models.py @@ -75,6 +75,7 @@ class RawDataListModel(BaseModel): class BulkInsertOutputModel(BaseModel): count: int + uploaded_to_datalake: Optional[bool] = False class ConditionListModel(BaseModel): diff --git a/api/app/routers/entities_raw.py b/api/app/routers/entities_raw.py index b67a1c0..2641d43 100644 --- a/api/app/routers/entities_raw.py +++ b/api/app/routers/entities_raw.py @@ -8,6 +8,7 @@ from typing import Annotated, Literal from fastapi import APIRouter, Depends from fastapi.responses import HTMLResponse +from loguru import logger from tortoise.exceptions import ValidationError from app.pydantic_models import RawDataListModel, BulkInsertOutputModel, RawDataModel @@ -75,26 +76,6 @@ async def create_raw_data( records = raw_data.dict().get("data_list") data_source = await DataSource.get(cnes=raw_data.cnes) - # ==================== - # SEND TO DATALAKE - # ==================== - formatter = get_formatter( - system=data_source.system.value, - entity=entity_name - ) - - if upload_to_datalake and formatter: - uploader = DatalakeUploader( - dump_mode="append", - force_unique_file_name=True, - ) - - for config, dataframe in apply_formatter(records, formatter).items(): - uploader.upload( - dataframe=dataframe, - **convert_model_config_to_dict(config) - ) - # ==================== # SAVE IN HCI DATABASE # ==================== @@ -115,14 +96,43 @@ async def create_raw_data( ) except ValidationError as e: return HTMLResponse(status_code=400, content=str(e)) + try: new_records = await Entity.bulk_create(records_to_create, ignore_conflicts=True) - return { - "count": len(new_records), - } except asyncpg.exceptions.DeadlockDetectedError as e: return HTMLResponse(status_code=400, content=str(e)) + # ==================== + # SEND TO DATALAKE + # ==================== + uploaded_to_datalake = False + + formatter = get_formatter( + system=data_source.system.value, + entity=entity_name + ) + + try: + if upload_to_datalake and formatter: + uploader = DatalakeUploader( + dump_mode="append", + force_unique_file_name=True, + ) + + for config, dataframe in apply_formatter(records, formatter).items(): + uploader.upload( + dataframe=dataframe, + **convert_model_config_to_dict(config) + ) + uploaded_to_datalake = True + except Exception as e: + logger.error(f"Error uploading to datalake: {e}") + finally: + return BulkInsertOutputModel( + count=len(new_records), + uploaded_to_datalake=uploaded_to_datalake, + ) + @router.post("/{entity_name}/setAsInvalid", status_code=200) async def set_as_invalid_flag_records(