diff --git a/api/app/routers/entities_raw.py b/api/app/routers/entities_raw.py index 39614f1..093b5ed 100644 --- a/api/app/routers/entities_raw.py +++ b/api/app/routers/entities_raw.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import asyncpg +import itertools from datetime import ( datetime as dt, @@ -14,12 +15,9 @@ from app.dependencies import get_current_active_user from app.models import User, RawPatientRecord, RawPatientCondition, DataSource, RawEncounter from app.enums import SystemEnum -from app.datalake import DatalakeUploader -from app.utils import ( - unnester_encounter, - unnester_patientconditions, - unnester_patientrecords -) + +from datalake.uploader import DatalakeUploader +from datalake.utils import get_formatter, apply_formatter router = APIRouter(prefix="/raw", tags=["Entidades RAW (Formato Raw/Bruto)"]) @@ -27,15 +25,12 @@ entities_config = { "patientrecords": { "class": RawPatientRecord, - "unnester": unnester_patientrecords, }, "patientconditions": { "class": RawPatientCondition, - "unnester": unnester_patientconditions, }, "encounter": { "class": RawEncounter, - "unnester": unnester_encounter, }, } @@ -85,20 +80,21 @@ async def create_raw_data( entity_name: Literal["patientrecords", "patientconditions", "encounter"], current_user: Annotated[User, Depends(get_current_active_user)], raw_data: RawDataListModel, - upload_to_datalake: bool = True, + upload_to_datalake: bool = False, ) -> BulkInsertOutputModel: - Entity = entities_config[entity_name]["class"] - unnester = entities_config[entity_name]["unnester"] + records = raw_data.dict().get("data_list") + data_source = await DataSource.get(cnes=raw_data.cnes) - raw_data = raw_data.dict() - cnes = raw_data.pop("cnes") - records = raw_data.pop("data_list") + # ==================== + # SEND TO DATALAKE + # ==================== + formatter = get_formatter( + system=data_source.system.value, + entity=entity_name + ) - # Get DataSource - data_source = await DataSource.get(cnes=cnes) - - if upload_to_datalake: + if upload_to_datalake and formatter: uploader = DatalakeUploader( biglake_table=True, dataset_is_public=False, @@ -106,16 +102,19 @@ async def create_raw_data( force_unique_file_name=True, ) - for name, dataframe in unnester(records): + for table_config, dataframe in apply_formatter(records, formatter).items(): uploader.upload( dataframe=dataframe, - dataset_id=datalake_config[data_source.system], - table_id=f"{name}_eventos", + dataset_id=table_config.dataset_id, + table_id=table_config.table_id, partition_by_date=True, - partition_column="updated_at", + partition_column=table_config.partition_column, ) - # Send to HCI Database + # ==================== + # SAVE IN HCI DATABASE + # ==================== + Entity = entities_config[entity_name]["class"] try: records_to_create = [] for record in records: @@ -126,7 +125,7 @@ async def create_raw_data( source_updated_at=record.get("source_updated_at"), source_id=record.get("source_id"), data=record.get("data"), - data_source=await DataSource.get(cnes=cnes), + data_source=data_source, creator=current_user, ) ) @@ -134,7 +133,9 @@ async def create_raw_data( return HTMLResponse(status_code=400, content=str(e)) try: new_records = await Entity.bulk_create(records_to_create, ignore_conflicts=True) - return {"count": len(new_records)} + return { + "count": len(new_records), + } except asyncpg.exceptions.DeadlockDetectedError as e: return HTMLResponse(status_code=400, content=str(e)) diff --git a/api/app/utils.py b/api/app/utils.py index 3c4364f..ed00ed7 100644 --- a/api/app/utils.py +++ b/api/app/utils.py @@ -2,6 +2,7 @@ from datetime import datetime, timedelta import hashlib import json +from typing import Literal import jwt import copy import pandas as pd @@ -121,81 +122,4 @@ async def get_instance(Model, table, slug=None, code=None): elif slug: table[slug] = await Model.get_or_none(slug=slug) - return table[slug] - - -def unnester_encounter(payloads: dict) -> pd.DataFrame: - tables = {} - - for payload in copy.deepcopy(payloads): - for field in [ - "vacinas", - "condicoes", - "encaminhamentos", - "indicadores", - "alergias_anamnese", - "exames_solicitados", - "prescricoes", - ]: - for row in payload["data"].pop(field, []): - row["atendimento_id"] = payload["source_id"] - row["updated_at"] = payload["source_updated_at"] - tables[field] = tables.get(field, []) + [row] - - payload["data"]['id'] = payload["source_id"] - payload["data"]["updated_at"] = payload["source_updated_at"] - payload["data"]["patient_cpf"] = payload["patient_cpf"] - payload["data"]["patient_code"] = payload["patient_code"] - - tables["atendimento"] = tables.get("atendimento", []) + [payload["data"]] - - result = [] - for table_name, rows in tables.items(): - result.append((table_name, pd.DataFrame(rows))) - - return result - - -def unnester_patientrecords(payloads: dict) -> pd.DataFrame: - tables = {} - - for payload in copy.deepcopy(payloads): - for field in [ - "telefones", - "cns_provisorio" - ]: - for row in payload["data"].pop(field, []): - row["patient_code"] = payload["patient_code"] - row["updated_at"] = payload["source_updated_at"] - tables[field] = tables.get(field, []) + [row] - - payload["data"]['id'] = payload["patient_code"] - payload["data"]["updated_at"] = payload["source_updated_at"] - payload["data"]["patient_cpf"] = payload["patient_cpf"] - payload["data"]["patient_code"] = payload["patient_code"] - - tables["paciente"] = tables.get("paciente", []) + [payload["data"]] - - result = [] - for table_name, rows in tables.items(): - result.append((table_name, pd.DataFrame(rows))) - - return result - - -def unnester_patientconditions(payloads: dict) -> pd.DataFrame: - tables = {} - - for payload in copy.deepcopy(payloads): - payload["data"]['id'] = payload["patient_code"] - payload["data"]["updated_at"] = payload["source_updated_at"] - payload["data"]["patient_cpf"] = payload["patient_cpf"] - payload["data"]["patient_code"] = payload["patient_code"] - - tables["resumo_diagnostico"] = tables.get("resumo_diagnostico", []) + [payload["data"]] - - result = [] - for table_name, rows in tables.items(): - result.append((table_name, pd.DataFrame(rows))) - - return result + return table[slug] \ No newline at end of file diff --git a/api/datalake/__init__.py b/api/datalake/__init__.py new file mode 100644 index 0000000..8dc7775 --- /dev/null +++ b/api/datalake/__init__.py @@ -0,0 +1,2 @@ +from datalake.utils import register_formatter +from datalake.formatters import * \ No newline at end of file diff --git a/api/datalake/formatters.py b/api/datalake/formatters.py new file mode 100644 index 0000000..2c28842 --- /dev/null +++ b/api/datalake/formatters.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +# ============================================= +# Formatters that are responsible for converting +# raw JSON records to Datalake table rows. +# ============================================= +from typing import List, Tuple +from datalake.utils import flatten, register_formatter +from datalake.models import ( + SMSRioCnsProvisorio, + SMSRioPaciente, + SMSRioTelefone, + VitacarePaciente, + VitacarePacienteHistorico, + VitacareAtendimento, + VitacareCondicao, + VitacareAlergia, + VitacareEncaminhamento, + VitacareExameSolicitado, + VitacareIndicador, + VitacarePrescricao, + VitacareVacina, +) + + +@register_formatter(system="smsrio", entity="patientrecords") +def format_smsrio_patient( + raw_record: dict +) -> Tuple[List[SMSRioPaciente], List[SMSRioTelefone], List[SMSRioCnsProvisorio]]: + # Convert source_updated_at to string + raw_record['source_updated_at'] = str(raw_record['source_updated_at']) + + # Flatten Record + flattened_patient = flatten(raw_record) + + # Initialize Tables + rows = { + "pacientes": [SMSRioPaciente(**flattened_patient)], + "telefones": [], + "cns_provisorio": [], + } + + # Create Tables for List Fields + for field_name, FieldModel in [ + ('telefones', SMSRioTelefone), + ('cns_provisorio', SMSRioCnsProvisorio) + ]: + # If field not in record, skip + if field_name not in raw_record['data']: + continue + + for value in raw_record['data'].pop(field_name) or []: + rows[field_name].append( + FieldModel( + value=value, + patient_cpf=raw_record.get("patient_cpf"), + source_updated_at=raw_record.get("source_updated_at") + ) + ) + + return rows['pacientes'], rows['telefones'], rows['cns_provisorio'] + + +@register_formatter(system="vitacare", entity="patientrecords") +def format_vitacare_patient( + raw_record: dict +) -> Tuple[List[VitacarePaciente | VitacarePacienteHistorico]]: + # Convert source_updated_at to string + raw_record['source_updated_at'] = str(raw_record['source_updated_at']) + + flattened = flatten(raw_record, list_max_depth=0) + + # Temp criterium to discriminate between Routine and Historic format + if 'AP' in raw_record['data'].keys(): + return ([VitacarePacienteHistorico(**flattened)],) + else: + return ([VitacarePaciente(**flattened)],) + + +@register_formatter(system="vitacare", entity="encounter") +def format_vitacare_encounter( + raw_record: dict +) -> Tuple[ + List[VitacareAtendimento], + List[VitacareCondicao], + List[VitacareAlergia], + List[VitacareEncaminhamento], + List[VitacareExameSolicitado], + List[VitacareIndicador], + List[VitacarePrescricao], + List[VitacareVacina], +]: + # Convert source_updated_at to string + raw_record['source_updated_at'] = str(raw_record['source_updated_at']) + + # Flatten Record + flattened = flatten( + raw_record, + dict_max_depth=3, + ) + + # Initialize Tables + rows = { + "encounter": [VitacareAtendimento(**flattened)], + "condicoes": [], + "alergias_anamnese": [], + "encaminhamentos": [], + "exames_solicitados": [], + "indicadores": [], + "prescricoes": [], + "vacinas": [], + } + + # Create Tables for List Fields + for field_name, FieldModel in [ + ('condicoes', VitacareCondicao), + ('alergias_anamnese', VitacareAlergia), + ('encaminhamentos', VitacareEncaminhamento), + ('exames_solicitados', VitacareExameSolicitado), + ('indicadores', VitacareIndicador), + ('prescricoes', VitacarePrescricao), + ('vacinas', VitacareVacina) + ]: + # If field not in record, skip + if field_name not in raw_record['data']: + continue + + for fields in raw_record['data'].pop(field_name) or []: + rows[field_name].append( + FieldModel( + patient_cpf=raw_record.get("patient_cpf"), + atendimento_id=raw_record.get("source_id"), + source_updated_at=raw_record.get("source_updated_at"), + **fields + ) + ) + + return tuple(rows.values()) diff --git a/api/datalake/models.py b/api/datalake/models.py new file mode 100644 index 0000000..4839441 --- /dev/null +++ b/api/datalake/models.py @@ -0,0 +1,355 @@ +# -*- coding: utf-8 -*- +# ============================================= +# TABLE MODELS +# ============================================= +# - Pydantic Models Representing Datalake Tables. +# These models describe the format that every +# row sent to the Datalake must follow. +# - Also, configuration of the table name, +# dataset, etc must be provided. +# ============================================= +from typing import Optional +from pydantic import BaseModel + + +# =============== +# SMSRio +# =============== +class SMSRioPaciente(BaseModel): + patient_cpf: str + source_updated_at: str + source_id: Optional[str] + data__nome: Optional[str] + data__nome_mae: Optional[str] + data__nome_pai: Optional[str] + data__dt_nasc: Optional[str] + data__sexo: Optional[str] + data__racaCor: Optional[str] + data__nacionalidade: Optional[str] + data__obito: Optional[str] + data__dt_obito: Optional[str] + data__end_tp_logrado_cod: Optional[str] + data__end_logrado: Optional[str] + data__end_numero: Optional[str] + data__end_comunidade: Optional[str] + data__end_complem: Optional[str] + data__end_bairro: Optional[str] + data__end_cep: Optional[str] + data__cod_mun_res: Optional[str] + data__uf_res: Optional[str] + data__cod_mun_nasc: Optional[str] + data__uf_nasc: Optional[str] + data__cod_pais_nasc: Optional[str] + data__email: Optional[str] + data__timestamp: Optional[str] + data__cns_provisorio: list[str] + data__telefones: list[str] + + class Config: + dataset_id = "brutos_plataforma_smsrio" + table_id = "paciente_eventos" + partition_column = "source_updated_at" + + +class SMSRioTelefone(BaseModel): + patient_cpf: str + value: str + source_updated_at: str + + class Config: + dataset_id = "brutos_plataforma_smsrio" + table_id = "paciente_telefone_eventos" + partition_column = "source_updated_at" + + +class SMSRioCnsProvisorio(BaseModel): + patient_cpf: str + value: str + source_updated_at: str + + class Config: + dataset_id = "brutos_plataforma_smsrio" + table_id = "paciente_cns_eventos" + partition_column = "source_updated_at" + + +# =============== +# Vitacare +# =============== +class VitacarePaciente(BaseModel): + patient_cpf: str + patient_code: str + source_updated_at: str + source_id: Optional[str] + data__ap: Optional[str] + data__id: Optional[str] + data__cep: Optional[str] + data__cns: Optional[str] + data__cpf: Optional[str] + data__dnv: Optional[str] + data__nis: Optional[str] + data__cnes: Optional[str] + data__nome: Optional[str] + data__sexo: Optional[str] + data__email: Optional[str] + data__obito: Optional[str] + data__bairro: Optional[str] + data__equipe: Optional[str] + data__nPront: Optional[str] + data__comodos: Optional[str] + data__nomeMae: Optional[str] + data__nomePai: Optional[str] + data__racaCor: Optional[str] + data__unidade: Optional[str] + data__ocupacao: Optional[str] + data__religiao: Optional[str] + data__telefone: Optional[str] + data__ineEquipe: Optional[str] + data__microarea: Optional[str] + data__logradouro: Optional[str] + data__nomeSocial: Optional[str] + data__destinoLixo: Optional[str] + data__luzEletrica: Optional[str] + data__codigoEquipe: Optional[str] + data__dataCadastro: Optional[str] + data__escolaridade: Optional[str] + data__tempoMoradia: Optional[str] + data__nacionalidade: Optional[str] + data__rendaFamiliar: Optional[str] + data__tipoDomicilio: Optional[str] + data__dataNascimento: Optional[str] + data__paisNascimento: Optional[str] + data__tipoLogradouro: Optional[str] + data__tratamentoAgua: Optional[str] + data__emSituacaoDeRua: Optional[str] + data__frequentaEscola: Optional[str] + data__meiosTransporte: Optional[str] + data__situacaoUsuario: Optional[str] + data__doencasCondicoes: Optional[str] + data__estadoNascimento: Optional[str] + data__estadoResidencia: Optional[str] + data__identidadeGenero: Optional[str] + data__meiosComunicacao: Optional[str] + data__orientacaoSexual: Optional[str] + data__possuiFiltroAgua: Optional[str] + data__possuiPlanoSaude: Optional[str] + data__situacaoFamiliar: Optional[str] + data__territorioSocial: Optional[str] + data__abastecimentoAgua: Optional[str] + data__animaisNoDomicilio: Optional[str] + data__cadastroPermanente: Optional[str] + data__familiaLocalizacao: Optional[str] + data__emCasoDoencaProcura: Optional[str] + data__municipioNascimento: Optional[str] + data__municipioResidencia: Optional[str] + data__responsavelFamiliar: Optional[str] + data__esgotamentoSanitario: Optional[str] + data__situacaoMoradiaPosse: Optional[str] + data__situacaoProfissional: Optional[str] + data__vulnerabilidadeSocial: Optional[str] + data__familiaBeneficiariaCfc: Optional[str] + data__dataAtualizacaoCadastro: Optional[str] + data__participaGrupoComunitario: Optional[str] + data__relacaoResponsavelFamiliar: Optional[str] + data__membroComunidadeTradicional: Optional[str] + data__dataAtualizacaoVinculoEquipe: Optional[str] + data__familiaBeneficiariaAuxilioBrasil: Optional[str] + data__criancaMatriculadaCrechePreEscola: Optional[str] + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "paciente_eventos" + partition_column = "source_updated_at" + + +class VitacarePacienteHistorico(BaseModel): + patient_cpf: str + patient_code: str + source_updated_at: str + source_id: Optional[str] + data__AP: Optional[str] + data__SEXO: Optional[str] + data__HIST_CID: Optional[str] + data__RACA_COR: Optional[str] + data__RELIGIAO: Optional[str] + data__cpfPaciente: Optional[str] + data__ESCOLARIDADE: Optional[str] + data__dataConsulta: Optional[str] + data__NACIONALIDADE: Optional[str] + data__FREQUENTA_ESCOLA: Optional[str] + data__SITUACAO_USUARIO: Optional[str] + data__TELEFONE_CONTATO: Optional[str] + data__dataNascPaciente: Optional[str] + data__SITUACAO_FAMILIAR: Optional[str] + data__TERRITORIO_SOCIAL: Optional[str] + data__NUMERO_CNES_UNIDADE: Optional[str] + data__N_DE_CONSULTAS_2018: Optional[str] + data__N_DE_CONSULTAS_2019: Optional[str] + data__N_DE_CONSULTAS_2020: Optional[str] + data__N_DE_CONSULTAS_2021: Optional[str] + data__N_DE_CONSULTAS_2022: Optional[str] + data__N_DE_CONSULTAS_2023: Optional[str] + data__PACIENTE_TEMPORARIO: Optional[str] + data__NOME_UNIDADE_DE_SAUDE: Optional[str] + data__POSSUI_PLANO_DE_SAUDE: Optional[str] + data__SITUACAO_PROFISSIONAL: Optional[str] + data__MUNICIPIO_DE_NASCIMENTO: Optional[str] + data__N_DE_PROCEDIMENTOS_2018: Optional[str] + data__N_DE_PROCEDIMENTOS_2019: Optional[str] + data__N_DE_PROCEDIMENTOS_2020: Optional[str] + data__N_DE_PROCEDIMENTOS_2021: Optional[str] + data__N_DE_PROCEDIMENTOS_2022: Optional[str] + data__N_DE_PROCEDIMENTOS_2023: Optional[str] + data__PACIENTE_SITUACAO_RUA: Optional[str] + data__CODIGO_DA_EQUIPE_DE_SAUDE: Optional[str] + data__NOME_DA_PESSOA_CADASTRADA: Optional[str] + data__N_CNS_DA_PESSOA_CADASTRADA: Optional[str] + data__NOME_DA_MAE_PESSOA_CADASTRADA: Optional[str] + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "paciente_historico_eventos" + partition_column = "source_updated_at" + + +class VitacareAtendimento(BaseModel): + patient_cpf: str + patient_code: str + source_updated_at: str + source_id: str + id: Optional[str] + data__unidade_ap: Optional[str] + data__unidade_cnes: Optional[str] + data__profissional__cns: Optional[str] + data__profissional__cpf: Optional[str] + data__profissional__nome: Optional[str] + data__profissional__cbo: Optional[str] + data__profissional__equipe__nome: Optional[str] + data__profissional__equipe__cod_equipe: Optional[str] + data__profissional__equipe__cod_ine: Optional[str] + data__data_consulta: Optional[str] + data__tipo_consulta: Optional[str] + data__eh_coleta: Optional[str] + data__motivo: Optional[str] + data__observacao: Optional[str] + data__avaliacao: Optional[str] + data__evolucao: Optional[str] + data__observacoes_atendimento: Optional[str] + data__condicoes: Optional[str] + data__prescricoes: Optional[str] + data__exames_solicitados: Optional[str] + data__vacinas: Optional[str] + data__alergias_anamnese: Optional[str] + data__indicadores: Optional[str] + data__encaminhamentos: Optional[str] + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "atendimento_eventos" + partition_column = "source_updated_at" + + +class VitacareCondicao(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + cod_cid10: str + cod_ciap2: Optional[str] + estado: str + data_diagnostico: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "condicoes_eventos" + partition_column = "source_updated_at" + + +class VitacareAlergia(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + descricao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "alergia_anamnese_eventos" + partition_column = "source_updated_at" + + +class VitacareEncaminhamento(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + descricao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "encaminhamento_eventos" + partition_column = "source_updated_at" + + +class VitacarePrescricao(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome_medicamento: str + cod_medicamento: str + quantidade: str + uso_continuado: bool + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "prescricao_eventos" + partition_column = "source_updated_at" + + +class VitacareExameSolicitado(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome_exame: str + cod_exame: str + quantidade: str + material: str + url_resultado: Optional[str] + data_solicitacao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "exames_solicitados_eventos" + partition_column = "source_updated_at" + + +class VitacareVacina(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome_vacina: str + cod_vacina: str + dose: str + lote: str + datahora_aplicacao: str + datahora_registro: str + diff: str + calendario_vacinal_atualizado: bool + dose_vtc: str + tipo_registro: str + estrategia_imunizacao: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "vacinas_eventos" + partition_column = "source_updated_at" + + +class VitacareIndicador(BaseModel): + patient_cpf: str + atendimento_id: str + source_updated_at: str + nome: str + valor: str + + class Config: + dataset_id = "brutos_prontuario_vitacare" + table_id = "indicadores_eventos" + partition_column = "source_updated_at" \ No newline at end of file diff --git a/api/app/datalake.py b/api/datalake/uploader.py similarity index 99% rename from api/app/datalake.py rename to api/datalake/uploader.py index d16d849..10a0cbf 100644 --- a/api/app/datalake.py +++ b/api/datalake/uploader.py @@ -47,7 +47,6 @@ def _validate_envs(self) -> None: raise ValueError(f"Missing environment variables: {missing_envs}") def _prepare_gcp_credential(self) -> None: - base64_credential = os.environ["BASEDOSDADOS_CREDENTIALS_PROD"] with open("/tmp/credentials.json", "wb") as f: diff --git a/api/datalake/utils.py b/api/datalake/utils.py new file mode 100644 index 0000000..2ff94dd --- /dev/null +++ b/api/datalake/utils.py @@ -0,0 +1,117 @@ +import re +import pandas as pd + +from loguru import logger + + +REGISTERED_FORMATTERS = {} + + +def register_formatter(system: str, entity: str): + """ + Decorator function to register a formatter for a specific system and entity. + + Args: + system (str): The name of the system. + entity (str): The name of the entity. + + Returns: + function: The decorated function. + """ + def decorator(func): + logger.info( + f"Registering formatter for {system} - {entity}: {func.__name__}") + REGISTERED_FORMATTERS[(system, entity)] = func + return func + return decorator + + +def get_formatter(system: str, entity: str): + """ + Retrieves the formatter function for the specified system and entity. + + Args: + system (str): The name of the system. + entity (str): The name of the entity. + + Returns: + function: The formatter function for the specified system and entity. + """ + formatter = REGISTERED_FORMATTERS.get((system, entity)) + if not formatter: + logger.warning(f"No formatter implemented for ({system},{entity})") + return formatter + + +# Função para aplanar um dicionário +def flatten( + record: dict, + dict_max_depth: int = 2, + list_max_depth: int = 1, + depth: int = 0, +) -> dict: + """ + Flatten a nested dictionary by concatenating keys with '__' separator. + + Args: + record (dict): The nested dictionary to be flattened. + dict_max_depth (int, optional): The maximum depth to flatten dictionaries. Defaults to 2. + list_max_depth (int, optional): The maximum depth to flatten lists. Defaults to 1. + depth (int, optional): The current depth of recursion. Defaults to 0. + + Returns: + dict: The flattened dictionary. + """ + updated_record = {} + for field, content in record.items(): + if isinstance(content, dict): + if depth < dict_max_depth: + flattened = flatten( + content, + depth=depth + 1, + dict_max_depth=dict_max_depth, + list_max_depth=list_max_depth + ) + for key, value in flattened.items(): + updated_record[f"{field}__{key}"] = value + else: + updated_record[field] = str(content) + elif isinstance(content, list) and depth >= list_max_depth: + updated_record[field] = str(content) + else: + updated_record[field] = content + + return updated_record + + +def apply_formatter(records: list[dict], formatter) -> dict: + """ + Applies a formatter function to a list of records and returns a dictionary of formatted tables. + + Args: + records (list[dict]): A list of records to be formatted. + formatter (function): A formatter function that takes a record as input and returns a list of row sets. + + Returns: + dict: A dictionary where the keys are table configurations and the values are pandas DataFrames containing the formatted rows. + """ + tables = {} + + for record in records: + try: + formatted_record = formatter(record) + except Exception as e: + logger.error(f"An error occured during the process {e}") + raise e + + for row_set in formatted_record: + for row in row_set: + if row.Config in tables: + tables[row.Config].append(row) + else: + tables[row.Config] = [row] + + for table_config, rows in tables.items(): + tables[table_config] = pd.DataFrame([row.dict() for row in rows]) + + return tables