From bbceae315352e7139efb94a63461ec1d0fd003bc Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 17:44:11 -0300 Subject: [PATCH] feat: compatibility with historic vitacare data --- api/datalake/formatters.py | 9 ++++-- api/datalake/models.py | 58 +++++++++++++++++++++++++++++++++++++- api/datalake/utils.py | 32 ++++++++++++--------- 3 files changed, 83 insertions(+), 16 deletions(-) diff --git a/api/datalake/formatters.py b/api/datalake/formatters.py index ae781c1..2c28842 100644 --- a/api/datalake/formatters.py +++ b/api/datalake/formatters.py @@ -10,6 +10,7 @@ SMSRioPaciente, SMSRioTelefone, VitacarePaciente, + VitacarePacienteHistorico, VitacareAtendimento, VitacareCondicao, VitacareAlergia, @@ -62,13 +63,17 @@ def format_smsrio_patient( @register_formatter(system="vitacare", entity="patientrecords") def format_vitacare_patient( raw_record: dict -) -> Tuple[List[SMSRioPaciente]]: +) -> Tuple[List[VitacarePaciente | VitacarePacienteHistorico]]: # Convert source_updated_at to string raw_record['source_updated_at'] = str(raw_record['source_updated_at']) flattened = flatten(raw_record, list_max_depth=0) - return ([VitacarePaciente(**flattened)],) + # Temp criterium to discriminate between Routine and Historic format + if 'AP' in raw_record['data'].keys(): + return ([VitacarePacienteHistorico(**flattened)],) + else: + return ([VitacarePaciente(**flattened)],) @register_formatter(system="vitacare", entity="encounter") diff --git a/api/datalake/models.py b/api/datalake/models.py index 204ae05..4839441 100644 --- a/api/datalake/models.py +++ b/api/datalake/models.py @@ -1,6 +1,12 @@ # -*- coding: utf-8 -*- # ============================================= -# Pydantic Models Representing Datalake Tables +# TABLE MODELS +# ============================================= +# - Pydantic Models Representing Datalake Tables. +# These models describe the format that every +# row sent to the Datalake must follow. +# - Also, configuration of the table name, +# dataset, etc must be provided. # ============================================= from typing import Optional from pydantic import BaseModel @@ -156,6 +162,56 @@ class Config: partition_column = "source_updated_at" +class VitacarePacienteHistorico(BaseModel): + patient_cpf: str + patient_code: str + source_updated_at: str + source_id: Optional[str] + data__AP: Optional[str] + data__SEXO: Optional[str] + data__HIST_CID: Optional[str] + data__RACA_COR: Optional[str] + data__RELIGIAO: Optional[str] + data__cpfPaciente: Optional[str] + data__ESCOLARIDADE: Optional[str] + data__dataConsulta: Optional[str] + data__NACIONALIDADE: Optional[str] + data__FREQUENTA_ESCOLA: Optional[str] + data__SITUACAO_USUARIO: Optional[str] + data__TELEFONE_CONTATO: Optional[str] + data__dataNascPaciente: Optional[str] + data__SITUACAO_FAMILIAR: Optional[str] + data__TERRITORIO_SOCIAL: Optional[str] + data__NUMERO_CNES_UNIDADE: Optional[str] + data__N_DE_CONSULTAS_2018: Optional[str] + data__N_DE_CONSULTAS_2019: Optional[str] + data__N_DE_CONSULTAS_2020: Optional[str] + data__N_DE_CONSULTAS_2021: Optional[str] + data__N_DE_CONSULTAS_2022: Optional[str] + data__N_DE_CONSULTAS_2023: Optional[str] + data__PACIENTE_TEMPORARIO: Optional[str] + data__NOME_UNIDADE_DE_SAUDE: Optional[str] + data__POSSUI_PLANO_DE_SAUDE: Optional[str] + data__SITUACAO_PROFISSIONAL: Optional[str] + data__MUNICIPIO_DE_NASCIMENTO: Optional[str] + data__N_DE_PROCEDIMENTOS_2018: Optional[str] + data__N_DE_PROCEDIMENTOS_2019: Optional[str] + data__N_DE_PROCEDIMENTOS_2020: Optional[str] + data__N_DE_PROCEDIMENTOS_2021: Optional[str] + data__N_DE_PROCEDIMENTOS_2022: Optional[str] + data__N_DE_PROCEDIMENTOS_2023: Optional[str] + data__PACIENTE_SITUACAO_RUA: Optional[str] + data__CODIGO_DA_EQUIPE_DE_SAUDE: Optional[str] + data__NOME_DA_PESSOA_CADASTRADA: Optional[str] + data__N_CNS_DA_PESSOA_CADASTRADA: Optional[str] + data__NOME_DA_MAE_PESSOA_CADASTRADA: Optional[str] + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "paciente_historico_eventos" + partition_column = "source_updated_at" + + class VitacareAtendimento(BaseModel): patient_cpf: str patient_code: str diff --git a/api/datalake/utils.py b/api/datalake/utils.py index d3704d3..2ff94dd 100644 --- a/api/datalake/utils.py +++ b/api/datalake/utils.py @@ -4,8 +4,8 @@ from loguru import logger -# Dicionário global para armazenar os formatters -formatters = {} +REGISTERED_FORMATTERS = {} + def register_formatter(system: str, entity: str): """ @@ -19,13 +19,13 @@ def register_formatter(system: str, entity: str): function: The decorated function. """ def decorator(func): - logger.info(f"Registering formatter for {system} - {entity}: {func.__name__}") - formatters[(system, entity)] = func + logger.info( + f"Registering formatter for {system} - {entity}: {func.__name__}") + REGISTERED_FORMATTERS[(system, entity)] = func return func return decorator -# Função para acessar o formatter def get_formatter(system: str, entity: str): """ Retrieves the formatter function for the specified system and entity. @@ -37,7 +37,7 @@ def get_formatter(system: str, entity: str): Returns: function: The formatter function for the specified system and entity. """ - formatter = formatters.get((system, entity)) + formatter = REGISTERED_FORMATTERS.get((system, entity)) if not formatter: logger.warning(f"No formatter implemented for ({system},{entity})") return formatter @@ -67,7 +67,7 @@ def flatten( if isinstance(content, dict): if depth < dict_max_depth: flattened = flatten( - content, + content, depth=depth + 1, dict_max_depth=dict_max_depth, list_max_depth=list_max_depth @@ -80,11 +80,11 @@ def flatten( updated_record[field] = str(content) else: updated_record[field] = content - + return updated_record -def apply_formatter(records:list[dict], formatter) -> dict: +def apply_formatter(records: list[dict], formatter) -> dict: """ Applies a formatter function to a list of records and returns a dictionary of formatted tables. @@ -98,14 +98,20 @@ def apply_formatter(records:list[dict], formatter) -> dict: tables = {} for record in records: - for row_set in formatter(record): + try: + formatted_record = formatter(record) + except Exception as e: + logger.error(f"An error occured during the process {e}") + raise e + + for row_set in formatted_record: for row in row_set: if row.Config in tables: tables[row.Config].append(row) else: tables[row.Config] = [row] - + for table_config, rows in tables.items(): tables[table_config] = pd.DataFrame([row.dict() for row in rows]) - - return tables \ No newline at end of file + + return tables