Skip to content

Commit

Permalink
refactor: patient mrg endpoint refactor to use bulk insert
Browse files Browse the repository at this point in the history
  • Loading branch information
TanookiVerde committed Jun 11, 2024
1 parent 2a0acab commit 9806c8a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 155 deletions.
234 changes: 80 additions & 154 deletions api/app/routers/entities_mrg.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import asyncio
import asyncpg
from typing import Annotated
from typing import Annotated, List

from app.utils import generate_dictionary_fingerprint, merge_versions
from fastapi import APIRouter, Depends
Expand All @@ -26,6 +26,7 @@
ConditionCode,
PatientCns,
)
from app.utils import update_and_return


router = APIRouter(prefix="/mrg", tags=["Entidades MRG (Formato Merged/Fundido)"])
Expand All @@ -38,162 +39,87 @@
@router.put("/patient")
async def create_or_update_patient(
_: Annotated[User, Depends(get_current_active_user)],
patients: list[PatientModel],
patients: List[PatientModel],
) -> list[PatientOutput]:

async def process_patient(patient_data):
# Remove null values
patient_data = {keys:values for keys, values in patient_data.items() if values is not None}

birth_city_task = City.get_or_none(
code=patient_data.get("birth_city")
)
race_task = Race.get_or_none(slug=patient_data.get("race"))
gender_task = Gender.get_or_none(slug=patient_data.get("gender"))
nationality_task = Nationality.get_or_none(slug=patient_data.get("nationality"))

birth_city, race, gender, nationality = await asyncio.gather(
birth_city_task, race_task, gender_task, nationality_task
)

new_data = {
"patient_cpf": patient_data.get("patient_cpf"),
"patient_code": patient_data.get("patient_code"),
"active": patient_data.get("active"),
"protected_person": patient_data.get("protected_person"),
"deceased": patient_data.get("deceased"),
"deceased_date": patient_data.get("deceased_date"),
"name": patient_data.get("name"),
"mother_name": patient_data.get("mother_name"),
"father_name": patient_data.get("father_name"),
"birth_date": patient_data.get("birth_date").isoformat(),
"birth_city": birth_city,
"race": race,
"gender": gender,
"nationality": nationality,
}

async with in_transaction():
patient = await Patient.get_or_none(
patient_cpf=patient_data["patient_cpf"]
).prefetch_related("address_patient_periods", "telecom_patient_periods", "patient_cns")

if patient:
await patient.update_from_dict(new_data).save()
else:
patient = await Patient.create(**new_data)

# -------------------------
# Address Update
# -------------------------
# Generate Fingerprints
addresses = patient_data.get("address_list", [])
for address in addresses:
address["fingerprint"] = generate_dictionary_fingerprint(address)

# Plan the Update of Patient Addresses
deletions, insertions = merge_versions(
patient.address_patient_periods.related_objects,
addresses
)

# Delete
deletions_tasks = [address.delete() for address in deletions]
await asyncio.gather(*deletions_tasks)

# Inserts
get_city_tasks = [
City.get_or_none(code=address["city"])
for address in insertions
]
address_cities = await asyncio.gather(*get_city_tasks)

insert_address_tasks = []
for address, city in zip(insertions, address_cities):
address["patient"] = patient
address["city"] = city
address["period_start"] = address.get("start")
address["period_end"] = address.get("end")

insert_address_tasks.append(
PatientAddress.create(**address)
)

await asyncio.gather(*insert_address_tasks)

# -------------------------
# Telecom Update
# -------------------------
telecoms = patient_data.get("telecom_list", [])
for telecom in telecoms:
telecom["fingerprint"] = generate_dictionary_fingerprint(telecom)

# Plan the Update of Patient Telecoms
deletions, insertions = merge_versions(
patient.telecom_patient_periods.related_objects,
telecoms
)

# Delete
deletions_tasks = [obj.delete() for obj in deletions]
await asyncio.gather(*deletions_tasks)

# Inserts
insert_telecom_tasks = []
for telecom in insertions:
telecom["patient"] = patient

races = {x.slug: x for x in await Race.all()}
cities = {x.code: x for x in await City.all()}
genders = {x.slug: x for x in await Gender.all()}
nationalities = {x.slug: x for x in await Nationality.all()}

patients = [patient.dict() for patient in patients]

addresses, cnss, telecoms = [], [], []
for patient in patients:
# Entity Splitting
addresses.append(patient.pop('address_list'))
cnss.append(patient.pop('cns_list'))
telecoms.append(patient.pop('telecom_list'))

# Object Convertions
patient['race'] = races.get(patient['race'])
patient['birth_city'] = cities.get(patient.get('birth_city'))
patient['gender'] = genders.get(patient.get('gender'))
patient['nationality'] = nationalities.get(patient.get('nationality'))
patient['birth_date'] = patient['birth_date'].isoformat()

existing_patients = [
Patient.get_or_none(
patient_code=x['patient_code']
).prefetch_related("address_patient_periods", "telecom_patient_periods", "patient_cns")
for x in patients
]
existing_patients = await asyncio.gather(*existing_patients)

awaitables = []
for i, patient in enumerate(patients):
if existing_patients[i]:
awaitables.append(update_and_return(existing_patients[i], patient))
else:
awaitables.append(Patient.create(**patient))
modified_patients = await asyncio.gather(*awaitables)

async def update_addresses():
addresses_to_insert = []
for i, address_list in enumerate(addresses):
for address in address_list:
address["patient"] = modified_patients[i]
address["city"] = cities.get(address.pop("city"))
address["period_start"] = address.pop("start")
address["period_end"] = address.pop("end")
addresses_to_insert.append(PatientAddress(**address))
await PatientAddress.filter(patient_id__in=[x.id for x in modified_patients]).delete()
await PatientAddress.bulk_create(addresses_to_insert)

async def update_telecoms():
telecoms_to_insert = []
for i, telecom_list in enumerate(telecoms):
for telecom in telecom_list:
telecom["patient"] = modified_patients[i]
telecom["period_start"] = telecom.get("start")
telecom["period_end"] = telecom.get("end")
insert_telecom_tasks.append(
PatientTelecom.create(**telecom)
)
await asyncio.gather(*insert_telecom_tasks)

# -------------------------
# CNS Update
# -------------------------
cnss = patient_data.get("cns_list", [])
for cns in cnss:
cns["fingerprint"] = generate_dictionary_fingerprint(cns)

# Plan the Update of Patient CNSs
deletions, insertions = merge_versions(
patient.patient_cns.related_objects,
cnss
)

# Delete
deletions_tasks = [obj.delete() for obj in deletions]
await asyncio.gather(*deletions_tasks)

# Detect Duplicated CNSs
duplicated_cnss = await PatientCns.filter(
value__in=[x['value'] for x in cnss]
).exclude(
patient=patient
)
untrusted_cnss = [x.value for x in duplicated_cnss]

# Delete Duplicated CNSs
deletions_tasks = [obj.delete() for obj in duplicated_cnss]
await asyncio.gather(*deletions_tasks)

# Inserts
insert_cns_tasks = []
for cns in insertions:
if cns["value"] in untrusted_cnss:
continue
cns["patient"] = patient
insert_cns_tasks.append(
PatientCns.create(**cns)
)
await asyncio.gather(*insert_cns_tasks)

return await PatientOutput.from_tortoise_orm(patient)

patient_tasks = [process_patient(patient.dict()) for patient in patients]
updated_patients = await asyncio.gather(*patient_tasks)
return updated_patients
telecoms_to_insert.append(PatientTelecom(**telecom))
await PatientTelecom.filter(patient_id__in=[x.id for x in modified_patients]).delete()
await PatientTelecom.bulk_create(telecoms_to_insert)

async def update_cnss():
async def create_cns(cns_params):
try:
await PatientCns.create(**cns_params)
except IntegrityError:
await PatientCns.get(value=cns_params["value"]).delete()

cns_creation_tasks = []
for i, cns_list in enumerate(cnss):
for cns in cns_list:
cns["patient"] = modified_patients[i]
cns_creation_tasks.append(create_cns(cns))
await PatientCns.filter(patient_id__in=[x.id for x in modified_patients]).delete()
await asyncio.gather(*cns_creation_tasks)

await asyncio.gather(*[update_cnss(), update_addresses(), update_telecoms()])

return modified_patients


@router.put("/patientcondition")
Expand Down
6 changes: 5 additions & 1 deletion api/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,8 @@ def merge_versions(current_objs, new_objs: dict) -> None:
for fingerprint in to_add
]

return deletions, insertions
return deletions, insertions

async def update_and_return(instance, new_data):
await instance.update_from_dict(new_data).save()
return instance

0 comments on commit 9806c8a

Please sign in to comment.