From 68b60a2f492291a4665dcf04e7a515fcc79723cc Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 25 Jul 2024 16:54:19 -0300 Subject: [PATCH] feat: implementing encounter formatters --- api/datalake/formatters.py | 90 +++++++++++- api/datalake/models.py | 272 ++++++++++++++++++++++++++++++++++--- api/datalake/utils.py | 36 +++-- 3 files changed, 367 insertions(+), 31 deletions(-) diff --git a/api/datalake/formatters.py b/api/datalake/formatters.py index bf5d908..83d5851 100644 --- a/api/datalake/formatters.py +++ b/api/datalake/formatters.py @@ -4,14 +4,23 @@ from datalake.models import ( SMSRioCnsProvisorio, SMSRioPaciente, - SMSRioTelefones, + SMSRioTelefone, + VitacarePaciente, + VitacareAtendimento, + VitacareCondicao, + VitacareAlergia, + VitacareEncaminhamento, + VitacareExameSolicitado, + VitacareIndicador, + VitacarePrescricao, + VitacareVacina, ) @register_formatter(system="smsrio", entity="patientrecords") def format_smsrio_patient( raw_record: dict -) -> Tuple[List[SMSRioPaciente], List[SMSRioTelefones], List[SMSRioCnsProvisorio]]: +) -> Tuple[List[SMSRioPaciente], List[SMSRioTelefone], List[SMSRioCnsProvisorio]]: # Convert source_updated_at to string raw_record['source_updated_at'] = str(raw_record['source_updated_at']) @@ -27,7 +36,7 @@ def format_smsrio_patient( # Create Tables for List Fields for field_name, FieldModel in [ - ('telefones', SMSRioTelefones), + ('telefones', SMSRioTelefone), ('cns_provisorio', SMSRioCnsProvisorio) ]: # If field not in record, skip @@ -43,4 +52,77 @@ def format_smsrio_patient( ) ) - return rows['pacientes'], rows['telefones'], rows['cns_provisorio'] \ No newline at end of file + return rows['pacientes'], rows['telefones'], rows['cns_provisorio'] + + +@register_formatter(system="vitacare", entity="patientrecords") +def format_vitacare_patient( + raw_record: dict +) -> Tuple[List[SMSRioPaciente]]: + # Convert source_updated_at to string + raw_record['source_updated_at'] = str(raw_record['source_updated_at']) + + flattened = flatten(raw_record, list_max_depth=0) + + return ([VitacarePaciente(**flattened)],) + + +@register_formatter(system="vitacare", entity="encounter") +def format_vitacare_encounter( + raw_record: dict +) -> Tuple[ + List[VitacareAtendimento], + List[VitacareCondicao], + List[VitacareAlergia], + List[VitacareEncaminhamento], + List[VitacareExameSolicitado], + List[VitacareIndicador], + List[VitacarePrescricao], + List[VitacareVacina], + ]: + # Convert source_updated_at to string + raw_record['source_updated_at'] = str(raw_record['source_updated_at']) + + # Flatten Record + flattened = flatten( + raw_record, + dict_max_depth=3, + ) + + # Initialize Tables + rows = { + "encounter": [VitacareAtendimento(**flattened)], + "condicoes": [], + "alergias_anamnese": [], + "encaminhamentos": [], + "exames_solicitados": [], + "indicadores": [], + "prescricoes": [], + "vacinas": [], + } + + # Create Tables for List Fields + for field_name, FieldModel in [ + ('condicoes', VitacareCondicao), + ('alergias_anamnese', VitacareAlergia), + ('encaminhamentos', VitacareEncaminhamento), + ('exames_solicitados', VitacareExameSolicitado), + ('indicadores', VitacareIndicador), + ('prescricoes', VitacarePrescricao), + ('vacinas', VitacareVacina) + ]: + # If field not in record, skip + if field_name not in raw_record['data']: + continue + + for fields in raw_record['data'].pop(field_name) or []: + rows[field_name].append( + FieldModel( + patient_cpf=raw_record.get("patient_cpf"), + atendimento_id=raw_record.get("source_id"), + source_updated_at=raw_record.get("source_updated_at"), + **fields + ) + ) + + return tuple(rows.values()) \ No newline at end of file diff --git a/api/datalake/models.py b/api/datalake/models.py index 6d0525a..08b5d4a 100644 --- a/api/datalake/models.py +++ b/api/datalake/models.py @@ -7,33 +7,36 @@ from pydantic import BaseModel +# =============== +# SMSRio +# =============== class SMSRioPaciente(BaseModel): patient_cpf: str source_updated_at: str source_id: Optional[str] - data__nome: str - data__nome_mae: str - data__nome_pai: str - data__dt_nasc: str - data__sexo: str - data__racaCor: str - data__nacionalidade: str - data__obito: str + data__nome: Optional[str] + data__nome_mae: Optional[str] + data__nome_pai: Optional[str] + data__dt_nasc: Optional[str] + data__sexo: Optional[str] + data__racaCor: Optional[str] + data__nacionalidade: Optional[str] + data__obito: Optional[str] data__dt_obito: Optional[str] data__end_tp_logrado_cod: Optional[str] - data__end_logrado: str - data__end_numero: str + data__end_logrado: Optional[str] + data__end_numero: Optional[str] data__end_comunidade: Optional[str] - data__end_complem: str - data__end_bairro: str - data__end_cep: str + data__end_complem: Optional[str] + data__end_bairro: Optional[str] + data__end_cep: Optional[str] data__cod_mun_res: Optional[str] - data__uf_res: str - data__cod_mun_nasc: str - data__uf_nasc: str + data__uf_res: Optional[str] + data__cod_mun_nasc: Optional[str] + data__uf_nasc: Optional[str] data__cod_pais_nasc: Optional[str] data__email: Optional[str] - data__timestamp: str + data__timestamp: Optional[str] data__cns_provisorio: list[str] data__telefones: list[str] @@ -43,7 +46,7 @@ class Config: partition_column = "source_updated_at" -class SMSRioTelefones(BaseModel): +class SMSRioTelefone(BaseModel): patient_cpf: str value: str source_updated_at: str @@ -63,4 +66,235 @@ class Config: dataset_id = "brutos_plataforma_smsrio" table_id = "paciente_cns_eventos" partition_column = "source_updated_at" - \ No newline at end of file + + +# =============== +# Vitacare +# =============== +class VitacarePaciente(BaseModel): + patient_cpf: str + patient_code: str + source_updated_at: str + source_id: Optional[str] + data__ap: Optional[str] + data__id: Optional[str] + data__cep: Optional[str] + data__cns: Optional[str] + data__cpf: Optional[str] + data__dnv: Optional[str] + data__nis: Optional[str] + data__cnes: Optional[str] + data__nome: Optional[str] + data__sexo: Optional[str] + data__email: Optional[str] + data__obito: Optional[str] + data__bairro: Optional[str] + data__equipe: Optional[str] + data__nPront: Optional[str] + data__comodos: Optional[str] + data__nomeMae: Optional[str] + data__nomePai: Optional[str] + data__racaCor: Optional[str] + data__unidade: Optional[str] + data__ocupacao: Optional[str] + data__religiao: Optional[str] + data__telefone: Optional[str] + data__ineEquipe: Optional[str] + data__microarea: Optional[str] + data__logradouro: Optional[str] + data__nomeSocial: Optional[str] + data__destinoLixo: Optional[str] + data__luzEletrica: Optional[str] + data__codigoEquipe: Optional[str] + data__dataCadastro: Optional[str] + data__escolaridade: Optional[str] + data__tempoMoradia: Optional[str] + data__nacionalidade: Optional[str] + data__rendaFamiliar: Optional[str] + data__tipoDomicilio: Optional[str] + data__dataNascimento: Optional[str] + data__paisNascimento: Optional[str] + data__tipoLogradouro: Optional[str] + data__tratamentoAgua: Optional[str] + data__emSituacaoDeRua: Optional[str] + data__frequentaEscola: Optional[str] + data__meiosTransporte: Optional[str] + data__situacaoUsuario: Optional[str] + data__doencasCondicoes: Optional[str] + data__estadoNascimento: Optional[str] + data__estadoResidencia: Optional[str] + data__identidadeGenero: Optional[str] + data__meiosComunicacao: Optional[str] + data__orientacaoSexual: Optional[str] + data__possuiFiltroAgua: Optional[str] + data__possuiPlanoSaude: Optional[str] + data__situacaoFamiliar: Optional[str] + data__territorioSocial: Optional[str] + data__abastecimentoAgua: Optional[str] + data__animaisNoDomicilio: Optional[str] + data__cadastroPermanente: Optional[str] + data__familiaLocalizacao: Optional[str] + data__emCasoDoencaProcura: Optional[str] + data__municipioNascimento: Optional[str] + data__municipioResidencia: Optional[str] + data__responsavelFamiliar: Optional[str] + data__esgotamentoSanitario: Optional[str] + data__situacaoMoradiaPosse: Optional[str] + data__situacaoProfissional: Optional[str] + data__vulnerabilidadeSocial: Optional[str] + data__familiaBeneficiariaCfc: Optional[str] + data__dataAtualizacaoCadastro: Optional[str] + data__participaGrupoComunitario: Optional[str] + data__relacaoResponsavelFamiliar: Optional[str] + data__membroComunidadeTradicional: Optional[str] + data__dataAtualizacaoVinculoEquipe: Optional[str] + data__familiaBeneficiariaAuxilioBrasil: Optional[str] + data__criancaMatriculadaCrechePreEscola: Optional[str] + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "paciente_eventos" + partition_column = "source_updated_at" + + +class VitacareAtendimento(BaseModel): + patient_cpf: str + patient_code: str + source_updated_at: str + source_id: str + id: Optional[str] + data__unidade_ap: Optional[str] + data__unidade_cnes: Optional[str] + data__profissional__cns: Optional[str] + data__profissional__cpf: Optional[str] + data__profissional__nome: Optional[str] + data__profissional__cbo: Optional[str] + data__profissional__equipe__nome: Optional[str] + data__profissional__equipe__cod_equipe: Optional[str] + data__profissional__equipe__cod_ine: Optional[str] + data__data_consulta: Optional[str] + data__tipo_consulta: Optional[str] + data__eh_coleta: Optional[str] + data__motivo: Optional[str] + data__observacao: Optional[str] + data__avaliacao: Optional[str] + data__evolucao: Optional[str] + data__observacoes_atendimento: Optional[str] + data__condicoes: Optional[str] + data__prescricoes: Optional[str] + data__exames_solicitados: Optional[str] + data__vacinas: Optional[str] + data__alergias_anamnese: Optional[str] + data__indicadores: Optional[str] + data__encaminhamentos: Optional[str] + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "atendimento_eventos" + partition_column = "source_updated_at" + + +class VitacareCondicao(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + cod_cid10: str + cod_ciap2: str + estado: str + data_diagnostico: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "condicoes_eventos" + partition_column = "source_updated_at" + + +class VitacareAlergia(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + descricao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "alergia_anamnese_eventos" + partition_column = "source_updated_at" + + +class VitacareEncaminhamento(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + descricao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "encaminhamento_eventos" + partition_column = "source_updated_at" + + +class VitacarePrescricao(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome_medicamento: str + cod_medicamento: str + quantidade: str + uso_continuado: bool + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "prescricao_eventos" + partition_column = "source_updated_at" + + +class VitacareExameSolicitado(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome_exame: str + cod_exame: str + quantidade: str + material: str + url_resultado: Optional[str] + data_solicitacao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "exames_solicitados_eventos" + partition_column = "source_updated_at" + + +class VitacareVacina(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome_vacina: str + cod_vacina: str + dose: str + lote: str + datahora_aplicacao: str + datahora_registro: str + diff: str + calendario_vacinal_atualizado: bool + dose_vtc: str + tipo_registro: str + estrategia_imunizacao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "vacinas_eventos" + partition_column = "source_updated_at" + + +class VitacareIndicador(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome: str + valor: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "indicadores_eventos" + partition_column = "source_updated_at" \ No newline at end of file diff --git a/api/datalake/utils.py b/api/datalake/utils.py index 360fbed..af1cf5f 100644 --- a/api/datalake/utils.py +++ b/api/datalake/utils.py @@ -1,4 +1,6 @@ +import re import pandas as pd + from loguru import logger @@ -45,29 +47,37 @@ def get_formatter(system: str, entity: str): # Função para aplanar um dicionário def flatten( record: dict, - max_depth: int = 2, + dict_max_depth: int = 2, + list_max_depth: int = 1, depth: int = 0, ) -> dict: """ - Flattens a nested dictionary by concatenating keys with '__' separator. - + Flatten a nested dictionary by concatenating keys with '__' separator. + Args: record (dict): The nested dictionary to be flattened. - max_depth (int, optional): The maximum depth to flatten. Defaults to 2. + dict_max_depth (int, optional): The maximum depth to flatten dictionaries. Defaults to 2. + list_max_depth (int, optional): The maximum depth to flatten lists. Defaults to 1. depth (int, optional): The current depth of recursion. Defaults to 0. - + Returns: dict: The flattened dictionary. """ updated_record = {} for field, content in record.items(): if isinstance(content, dict): - if depth < max_depth: - for key, value in flatten(content, depth=depth + 1).items(): + if depth < dict_max_depth: + flattened = flatten( + content, + depth=depth + 1, + dict_max_depth=dict_max_depth, + list_max_depth=list_max_depth + ) + for key, value in flattened.items(): updated_record[f"{field}__{key}"] = value else: updated_record[field] = str(content) - elif isinstance(content, list) and depth > 1: + elif isinstance(content, list) and depth >= list_max_depth: updated_record[field] = str(content) else: updated_record[field] = content @@ -76,6 +86,16 @@ def flatten( def apply_formatter(records:list[dict], formatter) -> dict: + """ + Applies a formatter function to a list of records and returns a dictionary of formatted tables. + + Args: + records (list[dict]): A list of records to be formatted. + formatter (function): A formatter function that takes a record as input and returns a list of row sets. + + Returns: + dict: A dictionary where the keys are table configurations and the values are pandas DataFrames containing the formatted rows. + """ tables = {} for record in records: