Skip to content

Commit

Permalink
Merge pull request #127 from prefeitura-rio/optim/patient-mrg-async-o…
Browse files Browse the repository at this point in the history
…ptimization

Patient MRG async optimization
  • Loading branch information
TanookiVerde committed Jun 11, 2024
2 parents 3be074a + 3e21939 commit ab741fc
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 87 deletions.
3 changes: 3 additions & 0 deletions api/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class PatientAddress(Model):
postal_code = fields.CharField(max_length=8, null=True)
period_start = fields.DateField(null=True)
period_end = fields.DateField(null=True)
fingerprint = fields.CharField(max_length=32, null=True)


class PatientTelecom(Model):
Expand All @@ -216,13 +217,15 @@ class PatientTelecom(Model):
rank = fields.IntField(null=True)
period_start = fields.DateField(null=True)
period_end = fields.DateField(null=True)
fingerprint = fields.CharField(max_length=32, null=True)


class PatientCns(Model):
id = fields.IntField(pk=True)
patient = fields.ForeignKeyField("app.Patient", related_name="patient_cns")
value = fields.CharField(max_length=16, unique=True)
is_main = fields.BooleanField(default=False)
fingerprint = fields.CharField(max_length=32, null=True)


class Patient(Model):
Expand Down
169 changes: 84 additions & 85 deletions api/app/routers/entities_mrg.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from typing import Annotated
import asyncio
from typing import Annotated, List

from fastapi import APIRouter, Depends
from fastapi.responses import HTMLResponse
Expand All @@ -22,116 +23,114 @@
ConditionCode,
PatientCns,
)
from app.utils import update_and_return


router = APIRouter(prefix="/mrg", tags=["Entidades MRG (Formato Merged/Fundido)"])
router = APIRouter(
prefix="/mrg", tags=["Entidades MRG (Formato Merged/Fundido)"])


PatientOutput = pydantic_model_creator(Patient, name="PatientOutput")
PatientConditionOutput = pydantic_model_creator(PatientCondition, name="PatientConditionOutput")
PatientConditionOutput = pydantic_model_creator(
PatientCondition, name="PatientConditionOutput")


@router.put("/patient")
async def create_or_update_patient(
_: Annotated[User, Depends(get_current_active_user)],
patients: list[PatientModel],
patients: List[PatientModel],
) -> list[PatientOutput]:

updated_patients = []
for patient in patients:
patient_data = patient.dict()
races = {x.slug: x for x in await Race.all()}
cities = {x.code: x for x in await City.all()}
genders = {x.slug: x for x in await Gender.all()}
nationalities = {x.slug: x for x in await Nationality.all()}

birth_city = await City.get_or_none(
code=patient_data["birth_city"],
state__code=patient_data["birth_state"],
state__country__code=patient_data["birth_country"],
)
new_data = {
"patient_cpf": patient_data.get("patient_cpf"),
"patient_code": patient_data.get("patient_code"),
"birth_date": patient_data.get("birth_date").isoformat(),
"active": patient_data.get("active"),
"protected_person": patient_data.get("protected_person"),
"deceased": patient_data.get("deceased"),
"deceased_date": patient_data.get("deceased_date"),
"name": patient_data.get("name"),
"mother_name": patient_data.get("mother_name"),
"father_name": patient_data.get("father_name"),
"birth_city": birth_city,
"race": await Race.get_or_none(slug=patient_data["race"]),
"gender": await Gender.get_or_none(slug=patient_data["gender"]),
"nationality": await Nationality.get_or_none(slug=patient_data["nationality"]),
}
patients = [patient.dict() for patient in patients]

try:
patient = await Patient.get_or_none(
patient_cpf=patient_data.get("patient_cpf")
addresses, cnss, telecoms = [], [], []
for patient in patients:
# Entity Splitting
addresses.append(patient.pop('address_list'))
cnss.append(patient.pop('cns_list'))
telecoms.append(patient.pop('telecom_list'))

# Object Convertions
patient['race'] = races.get(patient['race'])
patient['birth_city'] = cities.get(patient.get('birth_city'))
patient['gender'] = genders.get(patient.get('gender'))
patient['nationality'] = nationalities.get(patient.get('nationality'))
patient['birth_date'] = patient['birth_date'].isoformat()

try:
existing_patients = [
Patient.get_or_none(
patient_cpf=x['patient_cpf']
).prefetch_related("address_patient_periods", "telecom_patient_periods", "patient_cns")
except ValidationError as e:
return HTMLResponse(status_code=400, content=str(e))
for x in patients
]
existing_patients = await asyncio.gather(*existing_patients)
except ValidationError as e:
return HTMLResponse(
status_code=400,
content=f"Error fetching existing patients: {e}"
)

if patient is not None:
await patient.update_from_dict(new_data).save()
awaitables = []
for i, patient in enumerate(patients):
if existing_patients[i]:
awaitables.append(update_and_return(existing_patients[i], patient))
else:
try:
patient = await Patient.create(**new_data)
awaitables.append(Patient.create(**patient))
except ValidationError as e:
return HTMLResponse(status_code=400, content=str(e))
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))

# Reset de Address
for instance in patient.address_patient_periods.related_objects:
await instance.delete()

address_list = patient_data.get("address_list", [])
if address_list is not None:
for address in address_list:
address_city = await City.get_or_none(
code=address["city"],
state__code=address["state"],
state__country__code=address["country"],
return HTMLResponse(
status_code=400,
content=f"Error Creating Patients: {e}"
)
address["patient"] = patient
address["city"] = address_city
address["period_start"] = address.get("start")
address["period_end"] = address.get("end")
await PatientAddress.create(**address)

# Reset de Telecom
for instance in patient.telecom_patient_periods.related_objects:
await instance.delete()
modified_patients = await asyncio.gather(*awaitables)

telecom_list = patient_data.get("telecom_list", [])
if telecom_list is not None:
async def update_addresses():
addresses_to_insert = []
for i, address_list in enumerate(addresses):
for address in address_list:
address["patient"] = modified_patients[i]
address["city"] = cities.get(address.pop("city"))
address["period_start"] = address.pop("start")
address["period_end"] = address.pop("end")
addresses_to_insert.append(PatientAddress(**address))
await PatientAddress.filter(patient_id__in=[x.id for x in modified_patients]).delete()
await PatientAddress.bulk_create(addresses_to_insert)

async def update_telecoms():
telecoms_to_insert = []
for i, telecom_list in enumerate(telecoms):
for telecom in telecom_list:
telecom["patient"] = patient
telecom["patient"] = modified_patients[i]
telecom["period_start"] = telecom.get("start")
telecom["period_end"] = telecom.get("end")
await PatientTelecom.create(**telecom)
telecoms_to_insert.append(PatientTelecom(**telecom))
await PatientTelecom.filter(patient_id__in=[x.id for x in modified_patients]).delete()
await PatientTelecom.bulk_create(telecoms_to_insert)

# Reset de CNS
for instance in patient.patient_cns.related_objects:
await instance.delete()
async def update_cnss():
async def create_cns(cns_params):
try:
await PatientCns.create(**cns_params)
except IntegrityError:
await PatientCns.get(value=cns_params["value"]).delete()

cns_creation_tasks = []
for i, cns_list in enumerate(cnss):
for cns in cns_list:
cns["patient"] = modified_patients[i]
cns_creation_tasks.append(create_cns(cns))
await PatientCns.filter(patient_id__in=[x.id for x in modified_patients]).delete()
await asyncio.gather(*cns_creation_tasks)

await asyncio.gather(*[update_cnss(), update_addresses(), update_telecoms()])

cns_list = patient_data.get("cns_list", [])
if cns_list is not None:
for cns in patient_data.get("cns_list", []):
cns["patient"] = patient
try:
await PatientCns.create(**cns)
except IntegrityError:
# CNS already exists:
# - Don't trust both CNS
# - Delete the old CNS
patient_cns = await PatientCns.get_or_none(value=cns["value"])
if patient_cns:
await patient_cns.delete()

patient_instance = await PatientOutput.from_tortoise_orm(patient)
updated_patients.append(patient_instance)

return updated_patients
return modified_patients


@router.put("/patientcondition")
Expand Down
42 changes: 40 additions & 2 deletions api/app/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta

import hashlib
import json
import jwt
from passlib.context import CryptContext

Expand Down Expand Up @@ -72,4 +73,41 @@ def password_verify(password: str, hashed: str) -> bool:
Returns:
bool: True if the password matches the hash, False otherwise.
"""
return pwd_context.verify(password, hashed)
return pwd_context.verify(password, hashed)


def generate_dictionary_fingerprint(dict_obj: dict) -> str:
"""
Generate a fingerprint for a dictionary object.
Args:
dict_obj (dict): The dictionary object to generate the fingerprint for.
Returns:
str: The MD5 hash of the serialized dictionary object.
"""
serialized_obj = json.dumps(dict_obj, sort_keys=True)
return hashlib.md5(serialized_obj.encode('utf-8')).hexdigest()

def merge_versions(current_objs, new_objs: dict) -> None:
current_fingerprints = {obj.fingerprint: obj for obj in current_objs}
new_fingerprints = {obj.get("fingerprint"): obj for obj in new_objs}

to_delete = current_fingerprints.keys() - new_fingerprints.keys()
to_add = new_fingerprints.keys() - current_fingerprints.keys()

deletions = [
current_fingerprints[fingerprint]
for fingerprint in to_delete
]
insertions = [
new_fingerprints[fingerprint]
for fingerprint in to_add
]

return deletions, insertions

async def update_and_return(instance, new_data):
await instance.update_from_dict(new_data).save()
return instance
16 changes: 16 additions & 0 deletions api/migrations/app/15_20240606152620_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
from tortoise import BaseDBAsyncClient


async def upgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "patientaddress" ADD "fingerprint" VARCHAR(32);
ALTER TABLE "patientcns" ADD "fingerprint" VARCHAR(32);
ALTER TABLE "patienttelecom" ADD "fingerprint" VARCHAR(32);"""


async def downgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "patientcns" DROP COLUMN "fingerprint";
ALTER TABLE "patientaddress" DROP COLUMN "fingerprint";
ALTER TABLE "patienttelecom" DROP COLUMN "fingerprint";"""

0 comments on commit ab741fc

Please sign in to comment.