Skip to content

Commit

Permalink
feat: compatibility with historic vitacare data
Browse files Browse the repository at this point in the history
  • Loading branch information
TanookiVerde committed Jul 25, 2024
1 parent 9f23398 commit bbceae3
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 16 deletions.
9 changes: 7 additions & 2 deletions api/datalake/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
SMSRioPaciente,
SMSRioTelefone,
VitacarePaciente,
VitacarePacienteHistorico,
VitacareAtendimento,
VitacareCondicao,
VitacareAlergia,
Expand Down Expand Up @@ -62,13 +63,17 @@ def format_smsrio_patient(
@register_formatter(system="vitacare", entity="patientrecords")
def format_vitacare_patient(
raw_record: dict
) -> Tuple[List[SMSRioPaciente]]:
) -> Tuple[List[VitacarePaciente | VitacarePacienteHistorico]]:
# Convert source_updated_at to string
raw_record['source_updated_at'] = str(raw_record['source_updated_at'])

flattened = flatten(raw_record, list_max_depth=0)

return ([VitacarePaciente(**flattened)],)
# Temp criterium to discriminate between Routine and Historic format
if 'AP' in raw_record['data'].keys():
return ([VitacarePacienteHistorico(**flattened)],)
else:
return ([VitacarePaciente(**flattened)],)


@register_formatter(system="vitacare", entity="encounter")
Expand Down
58 changes: 57 additions & 1 deletion api/datalake/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# -*- coding: utf-8 -*-
# =============================================
# Pydantic Models Representing Datalake Tables
# TABLE MODELS
# =============================================
# - Pydantic Models Representing Datalake Tables.
# These models describe the format that every
# row sent to the Datalake must follow.
# - Also, configuration of the table name,
# dataset, etc must be provided.
# =============================================
from typing import Optional
from pydantic import BaseModel
Expand Down Expand Up @@ -156,6 +162,56 @@ class Config:
partition_column = "source_updated_at"


class VitacarePacienteHistorico(BaseModel):
patient_cpf: str
patient_code: str
source_updated_at: str
source_id: Optional[str]
data__AP: Optional[str]
data__SEXO: Optional[str]
data__HIST_CID: Optional[str]
data__RACA_COR: Optional[str]
data__RELIGIAO: Optional[str]
data__cpfPaciente: Optional[str]
data__ESCOLARIDADE: Optional[str]
data__dataConsulta: Optional[str]
data__NACIONALIDADE: Optional[str]
data__FREQUENTA_ESCOLA: Optional[str]
data__SITUACAO_USUARIO: Optional[str]
data__TELEFONE_CONTATO: Optional[str]
data__dataNascPaciente: Optional[str]
data__SITUACAO_FAMILIAR: Optional[str]
data__TERRITORIO_SOCIAL: Optional[str]
data__NUMERO_CNES_UNIDADE: Optional[str]
data__N_DE_CONSULTAS_2018: Optional[str]
data__N_DE_CONSULTAS_2019: Optional[str]
data__N_DE_CONSULTAS_2020: Optional[str]
data__N_DE_CONSULTAS_2021: Optional[str]
data__N_DE_CONSULTAS_2022: Optional[str]
data__N_DE_CONSULTAS_2023: Optional[str]
data__PACIENTE_TEMPORARIO: Optional[str]
data__NOME_UNIDADE_DE_SAUDE: Optional[str]
data__POSSUI_PLANO_DE_SAUDE: Optional[str]
data__SITUACAO_PROFISSIONAL: Optional[str]
data__MUNICIPIO_DE_NASCIMENTO: Optional[str]
data__N_DE_PROCEDIMENTOS_2018: Optional[str]
data__N_DE_PROCEDIMENTOS_2019: Optional[str]
data__N_DE_PROCEDIMENTOS_2020: Optional[str]
data__N_DE_PROCEDIMENTOS_2021: Optional[str]
data__N_DE_PROCEDIMENTOS_2022: Optional[str]
data__N_DE_PROCEDIMENTOS_2023: Optional[str]
data__PACIENTE_SITUACAO_RUA: Optional[str]
data__CODIGO_DA_EQUIPE_DE_SAUDE: Optional[str]
data__NOME_DA_PESSOA_CADASTRADA: Optional[str]
data__N_CNS_DA_PESSOA_CADASTRADA: Optional[str]
data__NOME_DA_MAE_PESSOA_CADASTRADA: Optional[str]

class Config:
dataset_id = "brutos_prontuario_vitacare"
table_id = "paciente_historico_eventos"
partition_column = "source_updated_at"


class VitacareAtendimento(BaseModel):
patient_cpf: str
patient_code: str
Expand Down
32 changes: 19 additions & 13 deletions api/datalake/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from loguru import logger


# Dicionário global para armazenar os formatters
formatters = {}
REGISTERED_FORMATTERS = {}


def register_formatter(system: str, entity: str):
"""
Expand All @@ -19,13 +19,13 @@ def register_formatter(system: str, entity: str):
function: The decorated function.
"""
def decorator(func):
logger.info(f"Registering formatter for {system} - {entity}: {func.__name__}")
formatters[(system, entity)] = func
logger.info(
f"Registering formatter for {system} - {entity}: {func.__name__}")
REGISTERED_FORMATTERS[(system, entity)] = func
return func
return decorator


# Função para acessar o formatter
def get_formatter(system: str, entity: str):
"""
Retrieves the formatter function for the specified system and entity.
Expand All @@ -37,7 +37,7 @@ def get_formatter(system: str, entity: str):
Returns:
function: The formatter function for the specified system and entity.
"""
formatter = formatters.get((system, entity))
formatter = REGISTERED_FORMATTERS.get((system, entity))
if not formatter:
logger.warning(f"No formatter implemented for ({system},{entity})")
return formatter
Expand Down Expand Up @@ -67,7 +67,7 @@ def flatten(
if isinstance(content, dict):
if depth < dict_max_depth:
flattened = flatten(
content,
content,
depth=depth + 1,
dict_max_depth=dict_max_depth,
list_max_depth=list_max_depth
Expand All @@ -80,11 +80,11 @@ def flatten(
updated_record[field] = str(content)
else:
updated_record[field] = content

return updated_record


def apply_formatter(records:list[dict], formatter) -> dict:
def apply_formatter(records: list[dict], formatter) -> dict:
"""
Applies a formatter function to a list of records and returns a dictionary of formatted tables.
Expand All @@ -98,14 +98,20 @@ def apply_formatter(records:list[dict], formatter) -> dict:
tables = {}

for record in records:
for row_set in formatter(record):
try:
formatted_record = formatter(record)
except Exception as e:
logger.error(f"An error occured during the process {e}")
raise e

for row_set in formatted_record:
for row in row_set:
if row.Config in tables:
tables[row.Config].append(row)
else:
tables[row.Config] = [row]

for table_config, rows in tables.items():
tables[table_config] = pd.DataFrame([row.dict() for row in rows])
return tables

return tables

0 comments on commit bbceae3

Please sign in to comment.