From 9806c8abe8669c10de2dab8111877a97ba41d267 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Tue, 11 Jun 2024 16:20:26 -0300 Subject: [PATCH] refactor: patient mrg endpoint refactor to use bulk insert --- api/app/routers/entities_mrg.py | 234 +++++++++++--------------------- api/app/utils.py | 6 +- 2 files changed, 85 insertions(+), 155 deletions(-) diff --git a/api/app/routers/entities_mrg.py b/api/app/routers/entities_mrg.py index 30e1cd9..b572112 100644 --- a/api/app/routers/entities_mrg.py +++ b/api/app/routers/entities_mrg.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import asyncio import asyncpg -from typing import Annotated +from typing import Annotated, List from app.utils import generate_dictionary_fingerprint, merge_versions from fastapi import APIRouter, Depends @@ -26,6 +26,7 @@ ConditionCode, PatientCns, ) +from app.utils import update_and_return router = APIRouter(prefix="/mrg", tags=["Entidades MRG (Formato Merged/Fundido)"]) @@ -38,162 +39,87 @@ @router.put("/patient") async def create_or_update_patient( _: Annotated[User, Depends(get_current_active_user)], - patients: list[PatientModel], + patients: List[PatientModel], ) -> list[PatientOutput]: - - async def process_patient(patient_data): - # Remove null values - patient_data = {keys:values for keys, values in patient_data.items() if values is not None} - - birth_city_task = City.get_or_none( - code=patient_data.get("birth_city") - ) - race_task = Race.get_or_none(slug=patient_data.get("race")) - gender_task = Gender.get_or_none(slug=patient_data.get("gender")) - nationality_task = Nationality.get_or_none(slug=patient_data.get("nationality")) - - birth_city, race, gender, nationality = await asyncio.gather( - birth_city_task, race_task, gender_task, nationality_task - ) - - new_data = { - "patient_cpf": patient_data.get("patient_cpf"), - "patient_code": patient_data.get("patient_code"), - "active": patient_data.get("active"), - "protected_person": patient_data.get("protected_person"), - "deceased": patient_data.get("deceased"), - "deceased_date": patient_data.get("deceased_date"), - "name": patient_data.get("name"), - "mother_name": patient_data.get("mother_name"), - "father_name": patient_data.get("father_name"), - "birth_date": patient_data.get("birth_date").isoformat(), - "birth_city": birth_city, - "race": race, - "gender": gender, - "nationality": nationality, - } - - async with in_transaction(): - patient = await Patient.get_or_none( - patient_cpf=patient_data["patient_cpf"] - ).prefetch_related("address_patient_periods", "telecom_patient_periods", "patient_cns") - - if patient: - await patient.update_from_dict(new_data).save() - else: - patient = await Patient.create(**new_data) - - # ------------------------- - # Address Update - # ------------------------- - # Generate Fingerprints - addresses = patient_data.get("address_list", []) - for address in addresses: - address["fingerprint"] = generate_dictionary_fingerprint(address) - - # Plan the Update of Patient Addresses - deletions, insertions = merge_versions( - patient.address_patient_periods.related_objects, - addresses - ) - - # Delete - deletions_tasks = [address.delete() for address in deletions] - await asyncio.gather(*deletions_tasks) - - # Inserts - get_city_tasks = [ - City.get_or_none(code=address["city"]) - for address in insertions - ] - address_cities = await asyncio.gather(*get_city_tasks) - - insert_address_tasks = [] - for address, city in zip(insertions, address_cities): - address["patient"] = patient - address["city"] = city - address["period_start"] = address.get("start") - address["period_end"] = address.get("end") - - insert_address_tasks.append( - PatientAddress.create(**address) - ) - - await asyncio.gather(*insert_address_tasks) - - # ------------------------- - # Telecom Update - # ------------------------- - telecoms = patient_data.get("telecom_list", []) - for telecom in telecoms: - telecom["fingerprint"] = generate_dictionary_fingerprint(telecom) - - # Plan the Update of Patient Telecoms - deletions, insertions = merge_versions( - patient.telecom_patient_periods.related_objects, - telecoms - ) - - # Delete - deletions_tasks = [obj.delete() for obj in deletions] - await asyncio.gather(*deletions_tasks) - - # Inserts - insert_telecom_tasks = [] - for telecom in insertions: - telecom["patient"] = patient + + races = {x.slug: x for x in await Race.all()} + cities = {x.code: x for x in await City.all()} + genders = {x.slug: x for x in await Gender.all()} + nationalities = {x.slug: x for x in await Nationality.all()} + + patients = [patient.dict() for patient in patients] + + addresses, cnss, telecoms = [], [], [] + for patient in patients: + # Entity Splitting + addresses.append(patient.pop('address_list')) + cnss.append(patient.pop('cns_list')) + telecoms.append(patient.pop('telecom_list')) + + # Object Convertions + patient['race'] = races.get(patient['race']) + patient['birth_city'] = cities.get(patient.get('birth_city')) + patient['gender'] = genders.get(patient.get('gender')) + patient['nationality'] = nationalities.get(patient.get('nationality')) + patient['birth_date'] = patient['birth_date'].isoformat() + + existing_patients = [ + Patient.get_or_none( + patient_code=x['patient_code'] + ).prefetch_related("address_patient_periods", "telecom_patient_periods", "patient_cns") + for x in patients + ] + existing_patients = await asyncio.gather(*existing_patients) + + awaitables = [] + for i, patient in enumerate(patients): + if existing_patients[i]: + awaitables.append(update_and_return(existing_patients[i], patient)) + else: + awaitables.append(Patient.create(**patient)) + modified_patients = await asyncio.gather(*awaitables) + + async def update_addresses(): + addresses_to_insert = [] + for i, address_list in enumerate(addresses): + for address in address_list: + address["patient"] = modified_patients[i] + address["city"] = cities.get(address.pop("city")) + address["period_start"] = address.pop("start") + address["period_end"] = address.pop("end") + addresses_to_insert.append(PatientAddress(**address)) + await PatientAddress.filter(patient_id__in=[x.id for x in modified_patients]).delete() + await PatientAddress.bulk_create(addresses_to_insert) + + async def update_telecoms(): + telecoms_to_insert = [] + for i, telecom_list in enumerate(telecoms): + for telecom in telecom_list: + telecom["patient"] = modified_patients[i] telecom["period_start"] = telecom.get("start") telecom["period_end"] = telecom.get("end") - insert_telecom_tasks.append( - PatientTelecom.create(**telecom) - ) - await asyncio.gather(*insert_telecom_tasks) - - # ------------------------- - # CNS Update - # ------------------------- - cnss = patient_data.get("cns_list", []) - for cns in cnss: - cns["fingerprint"] = generate_dictionary_fingerprint(cns) - - # Plan the Update of Patient CNSs - deletions, insertions = merge_versions( - patient.patient_cns.related_objects, - cnss - ) - - # Delete - deletions_tasks = [obj.delete() for obj in deletions] - await asyncio.gather(*deletions_tasks) - - # Detect Duplicated CNSs - duplicated_cnss = await PatientCns.filter( - value__in=[x['value'] for x in cnss] - ).exclude( - patient=patient - ) - untrusted_cnss = [x.value for x in duplicated_cnss] - - # Delete Duplicated CNSs - deletions_tasks = [obj.delete() for obj in duplicated_cnss] - await asyncio.gather(*deletions_tasks) - - # Inserts - insert_cns_tasks = [] - for cns in insertions: - if cns["value"] in untrusted_cnss: - continue - cns["patient"] = patient - insert_cns_tasks.append( - PatientCns.create(**cns) - ) - await asyncio.gather(*insert_cns_tasks) - - return await PatientOutput.from_tortoise_orm(patient) - - patient_tasks = [process_patient(patient.dict()) for patient in patients] - updated_patients = await asyncio.gather(*patient_tasks) - return updated_patients + telecoms_to_insert.append(PatientTelecom(**telecom)) + await PatientTelecom.filter(patient_id__in=[x.id for x in modified_patients]).delete() + await PatientTelecom.bulk_create(telecoms_to_insert) + + async def update_cnss(): + async def create_cns(cns_params): + try: + await PatientCns.create(**cns_params) + except IntegrityError: + await PatientCns.get(value=cns_params["value"]).delete() + + cns_creation_tasks = [] + for i, cns_list in enumerate(cnss): + for cns in cns_list: + cns["patient"] = modified_patients[i] + cns_creation_tasks.append(create_cns(cns)) + await PatientCns.filter(patient_id__in=[x.id for x in modified_patients]).delete() + await asyncio.gather(*cns_creation_tasks) + + await asyncio.gather(*[update_cnss(), update_addresses(), update_telecoms()]) + + return modified_patients @router.put("/patientcondition") diff --git a/api/app/utils.py b/api/app/utils.py index 8c55f11..d1521e0 100644 --- a/api/app/utils.py +++ b/api/app/utils.py @@ -109,4 +109,8 @@ def merge_versions(current_objs, new_objs: dict) -> None: for fingerprint in to_add ] - return deletions, insertions \ No newline at end of file + return deletions, insertions + +async def update_and_return(instance, new_data): + await instance.update_from_dict(new_data).save() + return instance \ No newline at end of file