From 7cf6b357adbce8482249f3bb6f726df0b3d8e020 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 15:23:08 -0300 Subject: [PATCH 01/10] Refactor Datalake Sending logic --- api/app/routers/entities_raw.py | 74 ++++++++------- api/app/utils.py | 80 +--------------- api/datalake/__init__.py | 2 + api/datalake/formatters.py | 46 ++++++++++ api/datalake/models.py | 66 +++++++++++++ api/{app/datalake.py => datalake/uploader.py} | 1 - api/datalake/utils.py | 92 +++++++++++++++++++ 7 files changed, 244 insertions(+), 117 deletions(-) create mode 100644 api/datalake/__init__.py create mode 100644 api/datalake/formatters.py create mode 100644 api/datalake/models.py rename api/{app/datalake.py => datalake/uploader.py} (99%) create mode 100644 api/datalake/utils.py diff --git a/api/app/routers/entities_raw.py b/api/app/routers/entities_raw.py index 39614f1..d9b2115 100644 --- a/api/app/routers/entities_raw.py +++ b/api/app/routers/entities_raw.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import asyncpg +import itertools from datetime import ( datetime as dt, @@ -14,12 +15,9 @@ from app.dependencies import get_current_active_user from app.models import User, RawPatientRecord, RawPatientCondition, DataSource, RawEncounter from app.enums import SystemEnum -from app.datalake import DatalakeUploader -from app.utils import ( - unnester_encounter, - unnester_patientconditions, - unnester_patientrecords -) + +from datalake.uploader import DatalakeUploader +from datalake.utils import get_formatter, apply_formatter router = APIRouter(prefix="/raw", tags=["Entidades RAW (Formato Raw/Bruto)"]) @@ -27,15 +25,12 @@ entities_config = { "patientrecords": { "class": RawPatientRecord, - "unnester": unnester_patientrecords, }, "patientconditions": { "class": RawPatientCondition, - "unnester": unnester_patientconditions, }, "encounter": { "class": RawEncounter, - "unnester": unnester_encounter, }, } @@ -85,37 +80,38 @@ async def create_raw_data( entity_name: Literal["patientrecords", "patientconditions", "encounter"], current_user: Annotated[User, Depends(get_current_active_user)], raw_data: RawDataListModel, - upload_to_datalake: bool = True, ) -> BulkInsertOutputModel: - Entity = entities_config[entity_name]["class"] - unnester = entities_config[entity_name]["unnester"] - - raw_data = raw_data.dict() - cnes = raw_data.pop("cnes") - records = raw_data.pop("data_list") - - # Get DataSource - data_source = await DataSource.get(cnes=cnes) - - if upload_to_datalake: - uploader = DatalakeUploader( - biglake_table=True, - dataset_is_public=False, - dump_mode="append", - force_unique_file_name=True, + records = raw_data.dict().get("data_list") + data_source = await DataSource.get(cnes=raw_data.cnes) + + # ==================== + # SEND TO DATALAKE + # ==================== + formatter = get_formatter( + system=data_source.system.value, + entity=entity_name + ) + uploader = DatalakeUploader( + biglake_table=True, + dataset_is_public=False, + dump_mode="append", + force_unique_file_name=True, + ) + + for table_config, dataframe in apply_formatter(records, formatter).items(): + uploader.upload( + dataframe=dataframe, + dataset_id=table_config.dataset_id, + table_id=table_config.table_id, + partition_by_date=True, + partition_column=table_config.partition_column, ) - for name, dataframe in unnester(records): - uploader.upload( - dataframe=dataframe, - dataset_id=datalake_config[data_source.system], - table_id=f"{name}_eventos", - partition_by_date=True, - partition_column="updated_at", - ) - - # Send to HCI Database + # ==================== + # SAVE IN HCI DATABASE + # ==================== + Entity = entities_config[entity_name]["class"] try: records_to_create = [] for record in records: @@ -126,7 +122,7 @@ async def create_raw_data( source_updated_at=record.get("source_updated_at"), source_id=record.get("source_id"), data=record.get("data"), - data_source=await DataSource.get(cnes=cnes), + data_source=data_source, creator=current_user, ) ) @@ -134,7 +130,9 @@ async def create_raw_data( return HTMLResponse(status_code=400, content=str(e)) try: new_records = await Entity.bulk_create(records_to_create, ignore_conflicts=True) - return {"count": len(new_records)} + return { + "count": len(new_records), + } except asyncpg.exceptions.DeadlockDetectedError as e: return HTMLResponse(status_code=400, content=str(e)) diff --git a/api/app/utils.py b/api/app/utils.py index 3c4364f..ed00ed7 100644 --- a/api/app/utils.py +++ b/api/app/utils.py @@ -2,6 +2,7 @@ from datetime import datetime, timedelta import hashlib import json +from typing import Literal import jwt import copy import pandas as pd @@ -121,81 +122,4 @@ async def get_instance(Model, table, slug=None, code=None): elif slug: table[slug] = await Model.get_or_none(slug=slug) - return table[slug] - - -def unnester_encounter(payloads: dict) -> pd.DataFrame: - tables = {} - - for payload in copy.deepcopy(payloads): - for field in [ - "vacinas", - "condicoes", - "encaminhamentos", - "indicadores", - "alergias_anamnese", - "exames_solicitados", - "prescricoes", - ]: - for row in payload["data"].pop(field, []): - row["atendimento_id"] = payload["source_id"] - row["updated_at"] = payload["source_updated_at"] - tables[field] = tables.get(field, []) + [row] - - payload["data"]['id'] = payload["source_id"] - payload["data"]["updated_at"] = payload["source_updated_at"] - payload["data"]["patient_cpf"] = payload["patient_cpf"] - payload["data"]["patient_code"] = payload["patient_code"] - - tables["atendimento"] = tables.get("atendimento", []) + [payload["data"]] - - result = [] - for table_name, rows in tables.items(): - result.append((table_name, pd.DataFrame(rows))) - - return result - - -def unnester_patientrecords(payloads: dict) -> pd.DataFrame: - tables = {} - - for payload in copy.deepcopy(payloads): - for field in [ - "telefones", - "cns_provisorio" - ]: - for row in payload["data"].pop(field, []): - row["patient_code"] = payload["patient_code"] - row["updated_at"] = payload["source_updated_at"] - tables[field] = tables.get(field, []) + [row] - - payload["data"]['id'] = payload["patient_code"] - payload["data"]["updated_at"] = payload["source_updated_at"] - payload["data"]["patient_cpf"] = payload["patient_cpf"] - payload["data"]["patient_code"] = payload["patient_code"] - - tables["paciente"] = tables.get("paciente", []) + [payload["data"]] - - result = [] - for table_name, rows in tables.items(): - result.append((table_name, pd.DataFrame(rows))) - - return result - - -def unnester_patientconditions(payloads: dict) -> pd.DataFrame: - tables = {} - - for payload in copy.deepcopy(payloads): - payload["data"]['id'] = payload["patient_code"] - payload["data"]["updated_at"] = payload["source_updated_at"] - payload["data"]["patient_cpf"] = payload["patient_cpf"] - payload["data"]["patient_code"] = payload["patient_code"] - - tables["resumo_diagnostico"] = tables.get("resumo_diagnostico", []) + [payload["data"]] - - result = [] - for table_name, rows in tables.items(): - result.append((table_name, pd.DataFrame(rows))) - - return result + return table[slug] \ No newline at end of file diff --git a/api/datalake/__init__.py b/api/datalake/__init__.py new file mode 100644 index 0000000..8dc7775 --- /dev/null +++ b/api/datalake/__init__.py @@ -0,0 +1,2 @@ +from datalake.utils import register_formatter +from datalake.formatters import * \ No newline at end of file diff --git a/api/datalake/formatters.py b/api/datalake/formatters.py new file mode 100644 index 0000000..bf5d908 --- /dev/null +++ b/api/datalake/formatters.py @@ -0,0 +1,46 @@ +from typing import List, Tuple + +from datalake.utils import flatten, register_formatter +from datalake.models import ( + SMSRioCnsProvisorio, + SMSRioPaciente, + SMSRioTelefones, +) + + +@register_formatter(system="smsrio", entity="patientrecords") +def format_smsrio_patient( + raw_record: dict +) -> Tuple[List[SMSRioPaciente], List[SMSRioTelefones], List[SMSRioCnsProvisorio]]: + # Convert source_updated_at to string + raw_record['source_updated_at'] = str(raw_record['source_updated_at']) + + # Flatten Record + flattened_patient = flatten(raw_record) + + # Initialize Tables + rows = { + "pacientes": [SMSRioPaciente(**flattened_patient)], + "telefones": [], + "cns_provisorio": [], + } + + # Create Tables for List Fields + for field_name, FieldModel in [ + ('telefones', SMSRioTelefones), + ('cns_provisorio', SMSRioCnsProvisorio) + ]: + # If field not in record, skip + if field_name not in raw_record['data']: + continue + + for value in raw_record['data'].pop(field_name) or []: + rows[field_name].append( + FieldModel( + value=value, + patient_cpf=raw_record.get("patient_cpf"), + source_updated_at=raw_record.get("source_updated_at") + ) + ) + + return rows['pacientes'], rows['telefones'], rows['cns_provisorio'] \ No newline at end of file diff --git a/api/datalake/models.py b/api/datalake/models.py new file mode 100644 index 0000000..6d0525a --- /dev/null +++ b/api/datalake/models.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# ============================================= +# Pydantic Models Representing Datalake Tables +# ============================================= +from datetime import date, datetime +from typing import Generic, Optional, List, TypeVar, Any +from pydantic import BaseModel + + +class SMSRioPaciente(BaseModel): + patient_cpf: str + source_updated_at: str + source_id: Optional[str] + data__nome: str + data__nome_mae: str + data__nome_pai: str + data__dt_nasc: str + data__sexo: str + data__racaCor: str + data__nacionalidade: str + data__obito: str + data__dt_obito: Optional[str] + data__end_tp_logrado_cod: Optional[str] + data__end_logrado: str + data__end_numero: str + data__end_comunidade: Optional[str] + data__end_complem: str + data__end_bairro: str + data__end_cep: str + data__cod_mun_res: Optional[str] + data__uf_res: str + data__cod_mun_nasc: str + data__uf_nasc: str + data__cod_pais_nasc: Optional[str] + data__email: Optional[str] + data__timestamp: str + data__cns_provisorio: list[str] + data__telefones: list[str] + + class Config: + dataset_id = "brutos_plataforma_smsrio" + table_id = "paciente_eventos" + partition_column = "source_updated_at" + + +class SMSRioTelefones(BaseModel): + patient_cpf: str + value: str + source_updated_at: str + + class Config: + dataset_id = "brutos_plataforma_smsrio" + table_id = "paciente_telefone_eventos" + partition_column = "source_updated_at" + + +class SMSRioCnsProvisorio(BaseModel): + patient_cpf: str + value: str + source_updated_at: str + + class Config: + dataset_id = "brutos_plataforma_smsrio" + table_id = "paciente_cns_eventos" + partition_column = "source_updated_at" + \ No newline at end of file diff --git a/api/app/datalake.py b/api/datalake/uploader.py similarity index 99% rename from api/app/datalake.py rename to api/datalake/uploader.py index d16d849..10a0cbf 100644 --- a/api/app/datalake.py +++ b/api/datalake/uploader.py @@ -47,7 +47,6 @@ def _validate_envs(self) -> None: raise ValueError(f"Missing environment variables: {missing_envs}") def _prepare_gcp_credential(self) -> None: - base64_credential = os.environ["BASEDOSDADOS_CREDENTIALS_PROD"] with open("/tmp/credentials.json", "wb") as f: diff --git a/api/datalake/utils.py b/api/datalake/utils.py new file mode 100644 index 0000000..360fbed --- /dev/null +++ b/api/datalake/utils.py @@ -0,0 +1,92 @@ +import pandas as pd +from loguru import logger + + +# Dicionário global para armazenar os formatters +formatters = {} + +def register_formatter(system: str, entity: str): + """ + Decorator function to register a formatter for a specific system and entity. + + Args: + system (str): The name of the system. + entity (str): The name of the entity. + + Returns: + function: The decorated function. + """ + def decorator(func): + logger.info(f"Registering formatter for {system} - {entity}: {func.__name__}") + formatters[(system, entity)] = func + return func + return decorator + + +# Função para acessar o formatter +def get_formatter(system: str, entity: str): + """ + Retrieves the formatter function for the specified system and entity. + + Args: + system (str): The name of the system. + entity (str): The name of the entity. + + Returns: + function: The formatter function for the specified system and entity. + + Raises: + AssertionError: If the formatter for the specified system and entity is not found. + """ + assert (system, entity) in formatters, f"Formatter for {system} - {entity} not found" + return formatters.get((system, entity)) + + +# Função para aplanar um dicionário +def flatten( + record: dict, + max_depth: int = 2, + depth: int = 0, +) -> dict: + """ + Flattens a nested dictionary by concatenating keys with '__' separator. + + Args: + record (dict): The nested dictionary to be flattened. + max_depth (int, optional): The maximum depth to flatten. Defaults to 2. + depth (int, optional): The current depth of recursion. Defaults to 0. + + Returns: + dict: The flattened dictionary. + """ + updated_record = {} + for field, content in record.items(): + if isinstance(content, dict): + if depth < max_depth: + for key, value in flatten(content, depth=depth + 1).items(): + updated_record[f"{field}__{key}"] = value + else: + updated_record[field] = str(content) + elif isinstance(content, list) and depth > 1: + updated_record[field] = str(content) + else: + updated_record[field] = content + + return updated_record + + +def apply_formatter(records:list[dict], formatter) -> dict: + tables = {} + + for record in records: + for row_set in formatter(record): + for row in row_set: + if row.Config in tables: + tables[row.Config].append(row) + else: + tables[row.Config] = [row] + + for table_config, rows in tables.items(): + tables[table_config] = pd.DataFrame([row.dict() for row in rows]) + + return tables \ No newline at end of file From 68b60a2f492291a4665dcf04e7a515fcc79723cc Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 16:54:19 -0300 Subject: [PATCH 02/10] feat: implementing encounter formatters --- api/datalake/formatters.py | 90 +++++++++++- api/datalake/models.py | 272 ++++++++++++++++++++++++++++++++++--- api/datalake/utils.py | 36 +++-- 3 files changed, 367 insertions(+), 31 deletions(-) diff --git a/api/datalake/formatters.py b/api/datalake/formatters.py index bf5d908..83d5851 100644 --- a/api/datalake/formatters.py +++ b/api/datalake/formatters.py @@ -4,14 +4,23 @@ from datalake.models import ( SMSRioCnsProvisorio, SMSRioPaciente, - SMSRioTelefones, + SMSRioTelefone, + VitacarePaciente, + VitacareAtendimento, + VitacareCondicao, + VitacareAlergia, + VitacareEncaminhamento, + VitacareExameSolicitado, + VitacareIndicador, + VitacarePrescricao, + VitacareVacina, ) @register_formatter(system="smsrio", entity="patientrecords") def format_smsrio_patient( raw_record: dict -) -> Tuple[List[SMSRioPaciente], List[SMSRioTelefones], List[SMSRioCnsProvisorio]]: +) -> Tuple[List[SMSRioPaciente], List[SMSRioTelefone], List[SMSRioCnsProvisorio]]: # Convert source_updated_at to string raw_record['source_updated_at'] = str(raw_record['source_updated_at']) @@ -27,7 +36,7 @@ def format_smsrio_patient( # Create Tables for List Fields for field_name, FieldModel in [ - ('telefones', SMSRioTelefones), + ('telefones', SMSRioTelefone), ('cns_provisorio', SMSRioCnsProvisorio) ]: # If field not in record, skip @@ -43,4 +52,77 @@ def format_smsrio_patient( ) ) - return rows['pacientes'], rows['telefones'], rows['cns_provisorio'] \ No newline at end of file + return rows['pacientes'], rows['telefones'], rows['cns_provisorio'] + + +@register_formatter(system="vitacare", entity="patientrecords") +def format_vitacare_patient( + raw_record: dict +) -> Tuple[List[SMSRioPaciente]]: + # Convert source_updated_at to string + raw_record['source_updated_at'] = str(raw_record['source_updated_at']) + + flattened = flatten(raw_record, list_max_depth=0) + + return ([VitacarePaciente(**flattened)],) + + +@register_formatter(system="vitacare", entity="encounter") +def format_vitacare_encounter( + raw_record: dict +) -> Tuple[ + List[VitacareAtendimento], + List[VitacareCondicao], + List[VitacareAlergia], + List[VitacareEncaminhamento], + List[VitacareExameSolicitado], + List[VitacareIndicador], + List[VitacarePrescricao], + List[VitacareVacina], + ]: + # Convert source_updated_at to string + raw_record['source_updated_at'] = str(raw_record['source_updated_at']) + + # Flatten Record + flattened = flatten( + raw_record, + dict_max_depth=3, + ) + + # Initialize Tables + rows = { + "encounter": [VitacareAtendimento(**flattened)], + "condicoes": [], + "alergias_anamnese": [], + "encaminhamentos": [], + "exames_solicitados": [], + "indicadores": [], + "prescricoes": [], + "vacinas": [], + } + + # Create Tables for List Fields + for field_name, FieldModel in [ + ('condicoes', VitacareCondicao), + ('alergias_anamnese', VitacareAlergia), + ('encaminhamentos', VitacareEncaminhamento), + ('exames_solicitados', VitacareExameSolicitado), + ('indicadores', VitacareIndicador), + ('prescricoes', VitacarePrescricao), + ('vacinas', VitacareVacina) + ]: + # If field not in record, skip + if field_name not in raw_record['data']: + continue + + for fields in raw_record['data'].pop(field_name) or []: + rows[field_name].append( + FieldModel( + patient_cpf=raw_record.get("patient_cpf"), + atendimento_id=raw_record.get("source_id"), + source_updated_at=raw_record.get("source_updated_at"), + **fields + ) + ) + + return tuple(rows.values()) \ No newline at end of file diff --git a/api/datalake/models.py b/api/datalake/models.py index 6d0525a..08b5d4a 100644 --- a/api/datalake/models.py +++ b/api/datalake/models.py @@ -7,33 +7,36 @@ from pydantic import BaseModel +# =============== +# SMSRio +# =============== class SMSRioPaciente(BaseModel): patient_cpf: str source_updated_at: str source_id: Optional[str] - data__nome: str - data__nome_mae: str - data__nome_pai: str - data__dt_nasc: str - data__sexo: str - data__racaCor: str - data__nacionalidade: str - data__obito: str + data__nome: Optional[str] + data__nome_mae: Optional[str] + data__nome_pai: Optional[str] + data__dt_nasc: Optional[str] + data__sexo: Optional[str] + data__racaCor: Optional[str] + data__nacionalidade: Optional[str] + data__obito: Optional[str] data__dt_obito: Optional[str] data__end_tp_logrado_cod: Optional[str] - data__end_logrado: str - data__end_numero: str + data__end_logrado: Optional[str] + data__end_numero: Optional[str] data__end_comunidade: Optional[str] - data__end_complem: str - data__end_bairro: str - data__end_cep: str + data__end_complem: Optional[str] + data__end_bairro: Optional[str] + data__end_cep: Optional[str] data__cod_mun_res: Optional[str] - data__uf_res: str - data__cod_mun_nasc: str - data__uf_nasc: str + data__uf_res: Optional[str] + data__cod_mun_nasc: Optional[str] + data__uf_nasc: Optional[str] data__cod_pais_nasc: Optional[str] data__email: Optional[str] - data__timestamp: str + data__timestamp: Optional[str] data__cns_provisorio: list[str] data__telefones: list[str] @@ -43,7 +46,7 @@ class Config: partition_column = "source_updated_at" -class SMSRioTelefones(BaseModel): +class SMSRioTelefone(BaseModel): patient_cpf: str value: str source_updated_at: str @@ -63,4 +66,235 @@ class Config: dataset_id = "brutos_plataforma_smsrio" table_id = "paciente_cns_eventos" partition_column = "source_updated_at" - \ No newline at end of file + + +# =============== +# Vitacare +# =============== +class VitacarePaciente(BaseModel): + patient_cpf: str + patient_code: str + source_updated_at: str + source_id: Optional[str] + data__ap: Optional[str] + data__id: Optional[str] + data__cep: Optional[str] + data__cns: Optional[str] + data__cpf: Optional[str] + data__dnv: Optional[str] + data__nis: Optional[str] + data__cnes: Optional[str] + data__nome: Optional[str] + data__sexo: Optional[str] + data__email: Optional[str] + data__obito: Optional[str] + data__bairro: Optional[str] + data__equipe: Optional[str] + data__nPront: Optional[str] + data__comodos: Optional[str] + data__nomeMae: Optional[str] + data__nomePai: Optional[str] + data__racaCor: Optional[str] + data__unidade: Optional[str] + data__ocupacao: Optional[str] + data__religiao: Optional[str] + data__telefone: Optional[str] + data__ineEquipe: Optional[str] + data__microarea: Optional[str] + data__logradouro: Optional[str] + data__nomeSocial: Optional[str] + data__destinoLixo: Optional[str] + data__luzEletrica: Optional[str] + data__codigoEquipe: Optional[str] + data__dataCadastro: Optional[str] + data__escolaridade: Optional[str] + data__tempoMoradia: Optional[str] + data__nacionalidade: Optional[str] + data__rendaFamiliar: Optional[str] + data__tipoDomicilio: Optional[str] + data__dataNascimento: Optional[str] + data__paisNascimento: Optional[str] + data__tipoLogradouro: Optional[str] + data__tratamentoAgua: Optional[str] + data__emSituacaoDeRua: Optional[str] + data__frequentaEscola: Optional[str] + data__meiosTransporte: Optional[str] + data__situacaoUsuario: Optional[str] + data__doencasCondicoes: Optional[str] + data__estadoNascimento: Optional[str] + data__estadoResidencia: Optional[str] + data__identidadeGenero: Optional[str] + data__meiosComunicacao: Optional[str] + data__orientacaoSexual: Optional[str] + data__possuiFiltroAgua: Optional[str] + data__possuiPlanoSaude: Optional[str] + data__situacaoFamiliar: Optional[str] + data__territorioSocial: Optional[str] + data__abastecimentoAgua: Optional[str] + data__animaisNoDomicilio: Optional[str] + data__cadastroPermanente: Optional[str] + data__familiaLocalizacao: Optional[str] + data__emCasoDoencaProcura: Optional[str] + data__municipioNascimento: Optional[str] + data__municipioResidencia: Optional[str] + data__responsavelFamiliar: Optional[str] + data__esgotamentoSanitario: Optional[str] + data__situacaoMoradiaPosse: Optional[str] + data__situacaoProfissional: Optional[str] + data__vulnerabilidadeSocial: Optional[str] + data__familiaBeneficiariaCfc: Optional[str] + data__dataAtualizacaoCadastro: Optional[str] + data__participaGrupoComunitario: Optional[str] + data__relacaoResponsavelFamiliar: Optional[str] + data__membroComunidadeTradicional: Optional[str] + data__dataAtualizacaoVinculoEquipe: Optional[str] + data__familiaBeneficiariaAuxilioBrasil: Optional[str] + data__criancaMatriculadaCrechePreEscola: Optional[str] + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "paciente_eventos" + partition_column = "source_updated_at" + + +class VitacareAtendimento(BaseModel): + patient_cpf: str + patient_code: str + source_updated_at: str + source_id: str + id: Optional[str] + data__unidade_ap: Optional[str] + data__unidade_cnes: Optional[str] + data__profissional__cns: Optional[str] + data__profissional__cpf: Optional[str] + data__profissional__nome: Optional[str] + data__profissional__cbo: Optional[str] + data__profissional__equipe__nome: Optional[str] + data__profissional__equipe__cod_equipe: Optional[str] + data__profissional__equipe__cod_ine: Optional[str] + data__data_consulta: Optional[str] + data__tipo_consulta: Optional[str] + data__eh_coleta: Optional[str] + data__motivo: Optional[str] + data__observacao: Optional[str] + data__avaliacao: Optional[str] + data__evolucao: Optional[str] + data__observacoes_atendimento: Optional[str] + data__condicoes: Optional[str] + data__prescricoes: Optional[str] + data__exames_solicitados: Optional[str] + data__vacinas: Optional[str] + data__alergias_anamnese: Optional[str] + data__indicadores: Optional[str] + data__encaminhamentos: Optional[str] + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "atendimento_eventos" + partition_column = "source_updated_at" + + +class VitacareCondicao(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + cod_cid10: str + cod_ciap2: str + estado: str + data_diagnostico: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "condicoes_eventos" + partition_column = "source_updated_at" + + +class VitacareAlergia(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + descricao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "alergia_anamnese_eventos" + partition_column = "source_updated_at" + + +class VitacareEncaminhamento(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + descricao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "encaminhamento_eventos" + partition_column = "source_updated_at" + + +class VitacarePrescricao(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome_medicamento: str + cod_medicamento: str + quantidade: str + uso_continuado: bool + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "prescricao_eventos" + partition_column = "source_updated_at" + + +class VitacareExameSolicitado(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome_exame: str + cod_exame: str + quantidade: str + material: str + url_resultado: Optional[str] + data_solicitacao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "exames_solicitados_eventos" + partition_column = "source_updated_at" + + +class VitacareVacina(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome_vacina: str + cod_vacina: str + dose: str + lote: str + datahora_aplicacao: str + datahora_registro: str + diff: str + calendario_vacinal_atualizado: bool + dose_vtc: str + tipo_registro: str + estrategia_imunizacao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "vacinas_eventos" + partition_column = "source_updated_at" + + +class VitacareIndicador(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome: str + valor: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "indicadores_eventos" + partition_column = "source_updated_at" \ No newline at end of file diff --git a/api/datalake/utils.py b/api/datalake/utils.py index 360fbed..af1cf5f 100644 --- a/api/datalake/utils.py +++ b/api/datalake/utils.py @@ -1,4 +1,6 @@ +import re import pandas as pd + from loguru import logger @@ -45,29 +47,37 @@ def get_formatter(system: str, entity: str): # Função para aplanar um dicionário def flatten( record: dict, - max_depth: int = 2, + dict_max_depth: int = 2, + list_max_depth: int = 1, depth: int = 0, ) -> dict: """ - Flattens a nested dictionary by concatenating keys with '__' separator. - + Flatten a nested dictionary by concatenating keys with '__' separator. + Args: record (dict): The nested dictionary to be flattened. - max_depth (int, optional): The maximum depth to flatten. Defaults to 2. + dict_max_depth (int, optional): The maximum depth to flatten dictionaries. Defaults to 2. + list_max_depth (int, optional): The maximum depth to flatten lists. Defaults to 1. depth (int, optional): The current depth of recursion. Defaults to 0. - + Returns: dict: The flattened dictionary. """ updated_record = {} for field, content in record.items(): if isinstance(content, dict): - if depth < max_depth: - for key, value in flatten(content, depth=depth + 1).items(): + if depth < dict_max_depth: + flattened = flatten( + content, + depth=depth + 1, + dict_max_depth=dict_max_depth, + list_max_depth=list_max_depth + ) + for key, value in flattened.items(): updated_record[f"{field}__{key}"] = value else: updated_record[field] = str(content) - elif isinstance(content, list) and depth > 1: + elif isinstance(content, list) and depth >= list_max_depth: updated_record[field] = str(content) else: updated_record[field] = content @@ -76,6 +86,16 @@ def flatten( def apply_formatter(records:list[dict], formatter) -> dict: + """ + Applies a formatter function to a list of records and returns a dictionary of formatted tables. + + Args: + records (list[dict]): A list of records to be formatted. + formatter (function): A formatter function that takes a record as input and returns a list of row sets. + + Returns: + dict: A dictionary where the keys are table configurations and the values are pandas DataFrames containing the formatted rows. + """ tables = {} for record in records: From 7e254e9ad3d8801ab83631ee7e590a63d47adfa3 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 16:55:41 -0300 Subject: [PATCH 03/10] chore: automatically reformat file --- api/datalake/formatters.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/api/datalake/formatters.py b/api/datalake/formatters.py index 83d5851..4efc208 100644 --- a/api/datalake/formatters.py +++ b/api/datalake/formatters.py @@ -61,7 +61,7 @@ def format_vitacare_patient( ) -> Tuple[List[SMSRioPaciente]]: # Convert source_updated_at to string raw_record['source_updated_at'] = str(raw_record['source_updated_at']) - + flattened = flatten(raw_record, list_max_depth=0) return ([VitacarePaciente(**flattened)],) @@ -71,15 +71,15 @@ def format_vitacare_patient( def format_vitacare_encounter( raw_record: dict ) -> Tuple[ - List[VitacareAtendimento], - List[VitacareCondicao], - List[VitacareAlergia], - List[VitacareEncaminhamento], - List[VitacareExameSolicitado], - List[VitacareIndicador], - List[VitacarePrescricao], - List[VitacareVacina], - ]: + List[VitacareAtendimento], + List[VitacareCondicao], + List[VitacareAlergia], + List[VitacareEncaminhamento], + List[VitacareExameSolicitado], + List[VitacareIndicador], + List[VitacarePrescricao], + List[VitacareVacina], +]: # Convert source_updated_at to string raw_record['source_updated_at'] = str(raw_record['source_updated_at']) @@ -125,4 +125,4 @@ def format_vitacare_encounter( ) ) - return tuple(rows.values()) \ No newline at end of file + return tuple(rows.values()) From 310e67836c74966b490a8a6382c32a768cfef64b Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 16:59:33 -0300 Subject: [PATCH 04/10] chore: quick docs in datalake files --- api/datalake/formatters.py | 6 +++++- api/datalake/models.py | 3 +-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/api/datalake/formatters.py b/api/datalake/formatters.py index 4efc208..ae781c1 100644 --- a/api/datalake/formatters.py +++ b/api/datalake/formatters.py @@ -1,5 +1,9 @@ +# -*- coding: utf-8 -*- +# ============================================= +# Formatters that are responsible for converting +# raw JSON records to Datalake table rows. +# ============================================= from typing import List, Tuple - from datalake.utils import flatten, register_formatter from datalake.models import ( SMSRioCnsProvisorio, diff --git a/api/datalake/models.py b/api/datalake/models.py index 08b5d4a..d0f5d31 100644 --- a/api/datalake/models.py +++ b/api/datalake/models.py @@ -2,8 +2,7 @@ # ============================================= # Pydantic Models Representing Datalake Tables # ============================================= -from datetime import date, datetime -from typing import Generic, Optional, List, TypeVar, Any +from typing import Optional from pydantic import BaseModel From bb595589373f0e6fcd8f296d4c3b92100396a475 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 17:01:22 -0300 Subject: [PATCH 05/10] feat: rollback of upload_to_datalake option --- api/app/routers/entities_raw.py | 38 +++++++++++++++++---------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/api/app/routers/entities_raw.py b/api/app/routers/entities_raw.py index d9b2115..f9f50a1 100644 --- a/api/app/routers/entities_raw.py +++ b/api/app/routers/entities_raw.py @@ -80,6 +80,7 @@ async def create_raw_data( entity_name: Literal["patientrecords", "patientconditions", "encounter"], current_user: Annotated[User, Depends(get_current_active_user)], raw_data: RawDataListModel, + upload_to_datalake: bool = False, ) -> BulkInsertOutputModel: records = raw_data.dict().get("data_list") @@ -88,25 +89,26 @@ async def create_raw_data( # ==================== # SEND TO DATALAKE # ==================== - formatter = get_formatter( - system=data_source.system.value, - entity=entity_name - ) - uploader = DatalakeUploader( - biglake_table=True, - dataset_is_public=False, - dump_mode="append", - force_unique_file_name=True, - ) - - for table_config, dataframe in apply_formatter(records, formatter).items(): - uploader.upload( - dataframe=dataframe, - dataset_id=table_config.dataset_id, - table_id=table_config.table_id, - partition_by_date=True, - partition_column=table_config.partition_column, + if upload_to_datalake: + formatter = get_formatter( + system=data_source.system.value, + entity=entity_name ) + uploader = DatalakeUploader( + biglake_table=True, + dataset_is_public=False, + dump_mode="append", + force_unique_file_name=True, + ) + + for table_config, dataframe in apply_formatter(records, formatter).items(): + uploader.upload( + dataframe=dataframe, + dataset_id=table_config.dataset_id, + table_id=table_config.table_id, + partition_by_date=True, + partition_column=table_config.partition_column, + ) # ==================== # SAVE IN HCI DATABASE From 69c49292a94bb10d605e9ae5d41d3f40c098b5b3 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 17:02:59 -0300 Subject: [PATCH 06/10] feat: Update cod_ciap2 field to be optional in VitacareCondicao model --- api/datalake/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/datalake/models.py b/api/datalake/models.py index d0f5d31..204ae05 100644 --- a/api/datalake/models.py +++ b/api/datalake/models.py @@ -198,7 +198,7 @@ class VitacareCondicao(BaseModel): atendimento_id: str source_updated_at: str cod_cid10: str - cod_ciap2: str + cod_ciap2: Optional[str] estado: str data_diagnostico: str From 43f6331a2cae669be45f95db461df8f47c5a3c80 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 17:06:58 -0300 Subject: [PATCH 07/10] feat: Refactor Datalake Sending logic and improve error handling --- api/app/routers/entities_raw.py | 11 ++++++----- api/datalake/utils.py | 4 +++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/api/app/routers/entities_raw.py b/api/app/routers/entities_raw.py index f9f50a1..093b5ed 100644 --- a/api/app/routers/entities_raw.py +++ b/api/app/routers/entities_raw.py @@ -89,11 +89,12 @@ async def create_raw_data( # ==================== # SEND TO DATALAKE # ==================== - if upload_to_datalake: - formatter = get_formatter( - system=data_source.system.value, - entity=entity_name - ) + formatter = get_formatter( + system=data_source.system.value, + entity=entity_name + ) + + if upload_to_datalake and formatter: uploader = DatalakeUploader( biglake_table=True, dataset_is_public=False, diff --git a/api/datalake/utils.py b/api/datalake/utils.py index af1cf5f..56036e9 100644 --- a/api/datalake/utils.py +++ b/api/datalake/utils.py @@ -40,7 +40,9 @@ def get_formatter(system: str, entity: str): Raises: AssertionError: If the formatter for the specified system and entity is not found. """ - assert (system, entity) in formatters, f"Formatter for {system} - {entity} not found" + formatter = formatters.get((system, entity)) + if not formatter: + logger.warning(f"No formatter implemented for ({system},{entity})") return formatters.get((system, entity)) From 0e863aa0a31fb7e6d5ce3e6e1ab3bf1c7fb38919 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 17:07:30 -0300 Subject: [PATCH 08/10] feat: Fix get_formatter function in datalake/utils.py --- api/datalake/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/datalake/utils.py b/api/datalake/utils.py index 56036e9..2866a03 100644 --- a/api/datalake/utils.py +++ b/api/datalake/utils.py @@ -43,7 +43,7 @@ def get_formatter(system: str, entity: str): formatter = formatters.get((system, entity)) if not formatter: logger.warning(f"No formatter implemented for ({system},{entity})") - return formatters.get((system, entity)) + return formatter # Função para aplanar um dicionário From 9f23398920b15cebdb0d9ec4409dca12de4b5b70 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 17:07:52 -0300 Subject: [PATCH 09/10] Fix get_formatter function in datalake/utils.py --- api/datalake/utils.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/api/datalake/utils.py b/api/datalake/utils.py index 2866a03..d3704d3 100644 --- a/api/datalake/utils.py +++ b/api/datalake/utils.py @@ -36,9 +36,6 @@ def get_formatter(system: str, entity: str): Returns: function: The formatter function for the specified system and entity. - - Raises: - AssertionError: If the formatter for the specified system and entity is not found. """ formatter = formatters.get((system, entity)) if not formatter: From bbceae315352e7139efb94a63461ec1d0fd003bc Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 17:44:11 -0300 Subject: [PATCH 10/10] feat: compatibility with historic vitacare data --- api/datalake/formatters.py | 9 ++++-- api/datalake/models.py | 58 +++++++++++++++++++++++++++++++++++++- api/datalake/utils.py | 32 ++++++++++++--------- 3 files changed, 83 insertions(+), 16 deletions(-) diff --git a/api/datalake/formatters.py b/api/datalake/formatters.py index ae781c1..2c28842 100644 --- a/api/datalake/formatters.py +++ b/api/datalake/formatters.py @@ -10,6 +10,7 @@ SMSRioPaciente, SMSRioTelefone, VitacarePaciente, + VitacarePacienteHistorico, VitacareAtendimento, VitacareCondicao, VitacareAlergia, @@ -62,13 +63,17 @@ def format_smsrio_patient( @register_formatter(system="vitacare", entity="patientrecords") def format_vitacare_patient( raw_record: dict -) -> Tuple[List[SMSRioPaciente]]: +) -> Tuple[List[VitacarePaciente | VitacarePacienteHistorico]]: # Convert source_updated_at to string raw_record['source_updated_at'] = str(raw_record['source_updated_at']) flattened = flatten(raw_record, list_max_depth=0) - return ([VitacarePaciente(**flattened)],) + # Temp criterium to discriminate between Routine and Historic format + if 'AP' in raw_record['data'].keys(): + return ([VitacarePacienteHistorico(**flattened)],) + else: + return ([VitacarePaciente(**flattened)],) @register_formatter(system="vitacare", entity="encounter") diff --git a/api/datalake/models.py b/api/datalake/models.py index 204ae05..4839441 100644 --- a/api/datalake/models.py +++ b/api/datalake/models.py @@ -1,6 +1,12 @@ # -*- coding: utf-8 -*- # ============================================= -# Pydantic Models Representing Datalake Tables +# TABLE MODELS +# ============================================= +# - Pydantic Models Representing Datalake Tables. +# These models describe the format that every +# row sent to the Datalake must follow. +# - Also, configuration of the table name, +# dataset, etc must be provided. # ============================================= from typing import Optional from pydantic import BaseModel @@ -156,6 +162,56 @@ class Config: partition_column = "source_updated_at" +class VitacarePacienteHistorico(BaseModel): + patient_cpf: str + patient_code: str + source_updated_at: str + source_id: Optional[str] + data__AP: Optional[str] + data__SEXO: Optional[str] + data__HIST_CID: Optional[str] + data__RACA_COR: Optional[str] + data__RELIGIAO: Optional[str] + data__cpfPaciente: Optional[str] + data__ESCOLARIDADE: Optional[str] + data__dataConsulta: Optional[str] + data__NACIONALIDADE: Optional[str] + data__FREQUENTA_ESCOLA: Optional[str] + data__SITUACAO_USUARIO: Optional[str] + data__TELEFONE_CONTATO: Optional[str] + data__dataNascPaciente: Optional[str] + data__SITUACAO_FAMILIAR: Optional[str] + data__TERRITORIO_SOCIAL: Optional[str] + data__NUMERO_CNES_UNIDADE: Optional[str] + data__N_DE_CONSULTAS_2018: Optional[str] + data__N_DE_CONSULTAS_2019: Optional[str] + data__N_DE_CONSULTAS_2020: Optional[str] + data__N_DE_CONSULTAS_2021: Optional[str] + data__N_DE_CONSULTAS_2022: Optional[str] + data__N_DE_CONSULTAS_2023: Optional[str] + data__PACIENTE_TEMPORARIO: Optional[str] + data__NOME_UNIDADE_DE_SAUDE: Optional[str] + data__POSSUI_PLANO_DE_SAUDE: Optional[str] + data__SITUACAO_PROFISSIONAL: Optional[str] + data__MUNICIPIO_DE_NASCIMENTO: Optional[str] + data__N_DE_PROCEDIMENTOS_2018: Optional[str] + data__N_DE_PROCEDIMENTOS_2019: Optional[str] + data__N_DE_PROCEDIMENTOS_2020: Optional[str] + data__N_DE_PROCEDIMENTOS_2021: Optional[str] + data__N_DE_PROCEDIMENTOS_2022: Optional[str] + data__N_DE_PROCEDIMENTOS_2023: Optional[str] + data__PACIENTE_SITUACAO_RUA: Optional[str] + data__CODIGO_DA_EQUIPE_DE_SAUDE: Optional[str] + data__NOME_DA_PESSOA_CADASTRADA: Optional[str] + data__N_CNS_DA_PESSOA_CADASTRADA: Optional[str] + data__NOME_DA_MAE_PESSOA_CADASTRADA: Optional[str] + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "paciente_historico_eventos" + partition_column = "source_updated_at" + + class VitacareAtendimento(BaseModel): patient_cpf: str patient_code: str diff --git a/api/datalake/utils.py b/api/datalake/utils.py index d3704d3..2ff94dd 100644 --- a/api/datalake/utils.py +++ b/api/datalake/utils.py @@ -4,8 +4,8 @@ from loguru import logger -# Dicionário global para armazenar os formatters -formatters = {} +REGISTERED_FORMATTERS = {} + def register_formatter(system: str, entity: str): """ @@ -19,13 +19,13 @@ def register_formatter(system: str, entity: str): function: The decorated function. """ def decorator(func): - logger.info(f"Registering formatter for {system} - {entity}: {func.__name__}") - formatters[(system, entity)] = func + logger.info( + f"Registering formatter for {system} - {entity}: {func.__name__}") + REGISTERED_FORMATTERS[(system, entity)] = func return func return decorator -# Função para acessar o formatter def get_formatter(system: str, entity: str): """ Retrieves the formatter function for the specified system and entity. @@ -37,7 +37,7 @@ def get_formatter(system: str, entity: str): Returns: function: The formatter function for the specified system and entity. """ - formatter = formatters.get((system, entity)) + formatter = REGISTERED_FORMATTERS.get((system, entity)) if not formatter: logger.warning(f"No formatter implemented for ({system},{entity})") return formatter @@ -67,7 +67,7 @@ def flatten( if isinstance(content, dict): if depth < dict_max_depth: flattened = flatten( - content, + content, depth=depth + 1, dict_max_depth=dict_max_depth, list_max_depth=list_max_depth @@ -80,11 +80,11 @@ def flatten( updated_record[field] = str(content) else: updated_record[field] = content - + return updated_record -def apply_formatter(records:list[dict], formatter) -> dict: +def apply_formatter(records: list[dict], formatter) -> dict: """ Applies a formatter function to a list of records and returns a dictionary of formatted tables. @@ -98,14 +98,20 @@ def apply_formatter(records:list[dict], formatter) -> dict: tables = {} for record in records: - for row_set in formatter(record): + try: + formatted_record = formatter(record) + except Exception as e: + logger.error(f"An error occured during the process {e}") + raise e + + for row_set in formatted_record: for row in row_set: if row.Config in tables: tables[row.Config].append(row) else: tables[row.Config] = [row] - + for table_config, rows in tables.items(): tables[table_config] = pd.DataFrame([row.dict() for row in rows]) - - return tables \ No newline at end of file + + return tables