Skip to content

Commit

Permalink
Merge pull request #159 from prefeitura-rio/feat/improved-datalake-up…
Browse files Browse the repository at this point in the history
…load

Feat/improved datalake upload
  • Loading branch information
TanookiVerde committed Jul 25, 2024
2 parents b7b493c + bbceae3 commit 53f8e7d
Show file tree
Hide file tree
Showing 7 changed files with 640 additions and 105 deletions.
53 changes: 27 additions & 26 deletions api/app/routers/entities_raw.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import asyncpg
import itertools

from datetime import (
datetime as dt,
Expand All @@ -14,28 +15,22 @@
from app.dependencies import get_current_active_user
from app.models import User, RawPatientRecord, RawPatientCondition, DataSource, RawEncounter
from app.enums import SystemEnum
from app.datalake import DatalakeUploader
from app.utils import (
unnester_encounter,
unnester_patientconditions,
unnester_patientrecords
)

from datalake.uploader import DatalakeUploader
from datalake.utils import get_formatter, apply_formatter


router = APIRouter(prefix="/raw", tags=["Entidades RAW (Formato Raw/Bruto)"])

entities_config = {
"patientrecords": {
"class": RawPatientRecord,
"unnester": unnester_patientrecords,
},
"patientconditions": {
"class": RawPatientCondition,
"unnester": unnester_patientconditions,
},
"encounter": {
"class": RawEncounter,
"unnester": unnester_encounter,
},
}

Expand Down Expand Up @@ -85,37 +80,41 @@ async def create_raw_data(
entity_name: Literal["patientrecords", "patientconditions", "encounter"],
current_user: Annotated[User, Depends(get_current_active_user)],
raw_data: RawDataListModel,
upload_to_datalake: bool = True,
upload_to_datalake: bool = False,
) -> BulkInsertOutputModel:

Entity = entities_config[entity_name]["class"]
unnester = entities_config[entity_name]["unnester"]
records = raw_data.dict().get("data_list")
data_source = await DataSource.get(cnes=raw_data.cnes)

raw_data = raw_data.dict()
cnes = raw_data.pop("cnes")
records = raw_data.pop("data_list")
# ====================
# SEND TO DATALAKE
# ====================
formatter = get_formatter(
system=data_source.system.value,
entity=entity_name
)

# Get DataSource
data_source = await DataSource.get(cnes=cnes)

if upload_to_datalake:
if upload_to_datalake and formatter:
uploader = DatalakeUploader(
biglake_table=True,
dataset_is_public=False,
dump_mode="append",
force_unique_file_name=True,
)

for name, dataframe in unnester(records):
for table_config, dataframe in apply_formatter(records, formatter).items():
uploader.upload(
dataframe=dataframe,
dataset_id=datalake_config[data_source.system],
table_id=f"{name}_eventos",
dataset_id=table_config.dataset_id,
table_id=table_config.table_id,
partition_by_date=True,
partition_column="updated_at",
partition_column=table_config.partition_column,
)

# Send to HCI Database
# ====================
# SAVE IN HCI DATABASE
# ====================
Entity = entities_config[entity_name]["class"]
try:
records_to_create = []
for record in records:
Expand All @@ -126,15 +125,17 @@ async def create_raw_data(
source_updated_at=record.get("source_updated_at"),
source_id=record.get("source_id"),
data=record.get("data"),
data_source=await DataSource.get(cnes=cnes),
data_source=data_source,
creator=current_user,
)
)
except ValidationError as e:
return HTMLResponse(status_code=400, content=str(e))
try:
new_records = await Entity.bulk_create(records_to_create, ignore_conflicts=True)
return {"count": len(new_records)}
return {
"count": len(new_records),
}
except asyncpg.exceptions.DeadlockDetectedError as e:
return HTMLResponse(status_code=400, content=str(e))

Expand Down
80 changes: 2 additions & 78 deletions api/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime, timedelta
import hashlib
import json
from typing import Literal
import jwt
import copy
import pandas as pd
Expand Down Expand Up @@ -121,81 +122,4 @@ async def get_instance(Model, table, slug=None, code=None):
elif slug:
table[slug] = await Model.get_or_none(slug=slug)

return table[slug]


def unnester_encounter(payloads: dict) -> pd.DataFrame:
tables = {}

for payload in copy.deepcopy(payloads):
for field in [
"vacinas",
"condicoes",
"encaminhamentos",
"indicadores",
"alergias_anamnese",
"exames_solicitados",
"prescricoes",
]:
for row in payload["data"].pop(field, []):
row["atendimento_id"] = payload["source_id"]
row["updated_at"] = payload["source_updated_at"]
tables[field] = tables.get(field, []) + [row]

payload["data"]['id'] = payload["source_id"]
payload["data"]["updated_at"] = payload["source_updated_at"]
payload["data"]["patient_cpf"] = payload["patient_cpf"]
payload["data"]["patient_code"] = payload["patient_code"]

tables["atendimento"] = tables.get("atendimento", []) + [payload["data"]]

result = []
for table_name, rows in tables.items():
result.append((table_name, pd.DataFrame(rows)))

return result


def unnester_patientrecords(payloads: dict) -> pd.DataFrame:
tables = {}

for payload in copy.deepcopy(payloads):
for field in [
"telefones",
"cns_provisorio"
]:
for row in payload["data"].pop(field, []):
row["patient_code"] = payload["patient_code"]
row["updated_at"] = payload["source_updated_at"]
tables[field] = tables.get(field, []) + [row]

payload["data"]['id'] = payload["patient_code"]
payload["data"]["updated_at"] = payload["source_updated_at"]
payload["data"]["patient_cpf"] = payload["patient_cpf"]
payload["data"]["patient_code"] = payload["patient_code"]

tables["paciente"] = tables.get("paciente", []) + [payload["data"]]

result = []
for table_name, rows in tables.items():
result.append((table_name, pd.DataFrame(rows)))

return result


def unnester_patientconditions(payloads: dict) -> pd.DataFrame:
tables = {}

for payload in copy.deepcopy(payloads):
payload["data"]['id'] = payload["patient_code"]
payload["data"]["updated_at"] = payload["source_updated_at"]
payload["data"]["patient_cpf"] = payload["patient_cpf"]
payload["data"]["patient_code"] = payload["patient_code"]

tables["resumo_diagnostico"] = tables.get("resumo_diagnostico", []) + [payload["data"]]

result = []
for table_name, rows in tables.items():
result.append((table_name, pd.DataFrame(rows)))

return result
return table[slug]
2 changes: 2 additions & 0 deletions api/datalake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from datalake.utils import register_formatter
from datalake.formatters import *
137 changes: 137 additions & 0 deletions api/datalake/formatters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
# =============================================
# Formatters that are responsible for converting
# raw JSON records to Datalake table rows.
# =============================================
from typing import List, Tuple
from datalake.utils import flatten, register_formatter
from datalake.models import (
SMSRioCnsProvisorio,
SMSRioPaciente,
SMSRioTelefone,
VitacarePaciente,
VitacarePacienteHistorico,
VitacareAtendimento,
VitacareCondicao,
VitacareAlergia,
VitacareEncaminhamento,
VitacareExameSolicitado,
VitacareIndicador,
VitacarePrescricao,
VitacareVacina,
)


@register_formatter(system="smsrio", entity="patientrecords")
def format_smsrio_patient(
raw_record: dict
) -> Tuple[List[SMSRioPaciente], List[SMSRioTelefone], List[SMSRioCnsProvisorio]]:
# Convert source_updated_at to string
raw_record['source_updated_at'] = str(raw_record['source_updated_at'])

# Flatten Record
flattened_patient = flatten(raw_record)

# Initialize Tables
rows = {
"pacientes": [SMSRioPaciente(**flattened_patient)],
"telefones": [],
"cns_provisorio": [],
}

# Create Tables for List Fields
for field_name, FieldModel in [
('telefones', SMSRioTelefone),
('cns_provisorio', SMSRioCnsProvisorio)
]:
# If field not in record, skip
if field_name not in raw_record['data']:
continue

for value in raw_record['data'].pop(field_name) or []:
rows[field_name].append(
FieldModel(
value=value,
patient_cpf=raw_record.get("patient_cpf"),
source_updated_at=raw_record.get("source_updated_at")
)
)

return rows['pacientes'], rows['telefones'], rows['cns_provisorio']


@register_formatter(system="vitacare", entity="patientrecords")
def format_vitacare_patient(
raw_record: dict
) -> Tuple[List[VitacarePaciente | VitacarePacienteHistorico]]:
# Convert source_updated_at to string
raw_record['source_updated_at'] = str(raw_record['source_updated_at'])

flattened = flatten(raw_record, list_max_depth=0)

# Temp criterium to discriminate between Routine and Historic format
if 'AP' in raw_record['data'].keys():
return ([VitacarePacienteHistorico(**flattened)],)
else:
return ([VitacarePaciente(**flattened)],)


@register_formatter(system="vitacare", entity="encounter")
def format_vitacare_encounter(
raw_record: dict
) -> Tuple[
List[VitacareAtendimento],
List[VitacareCondicao],
List[VitacareAlergia],
List[VitacareEncaminhamento],
List[VitacareExameSolicitado],
List[VitacareIndicador],
List[VitacarePrescricao],
List[VitacareVacina],
]:
# Convert source_updated_at to string
raw_record['source_updated_at'] = str(raw_record['source_updated_at'])

# Flatten Record
flattened = flatten(
raw_record,
dict_max_depth=3,
)

# Initialize Tables
rows = {
"encounter": [VitacareAtendimento(**flattened)],
"condicoes": [],
"alergias_anamnese": [],
"encaminhamentos": [],
"exames_solicitados": [],
"indicadores": [],
"prescricoes": [],
"vacinas": [],
}

# Create Tables for List Fields
for field_name, FieldModel in [
('condicoes', VitacareCondicao),
('alergias_anamnese', VitacareAlergia),
('encaminhamentos', VitacareEncaminhamento),
('exames_solicitados', VitacareExameSolicitado),
('indicadores', VitacareIndicador),
('prescricoes', VitacarePrescricao),
('vacinas', VitacareVacina)
]:
# If field not in record, skip
if field_name not in raw_record['data']:
continue

for fields in raw_record['data'].pop(field_name) or []:
rows[field_name].append(
FieldModel(
patient_cpf=raw_record.get("patient_cpf"),
atendimento_id=raw_record.get("source_id"),
source_updated_at=raw_record.get("source_updated_at"),
**fields
)
)

return tuple(rows.values())
Loading

0 comments on commit 53f8e7d

Please sign in to comment.