Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending Raw Data to Datalake #157

Merged
merged 42 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
6c638d0
chore: Including basedosdados and upload_to_datalake function
TanookiVerde Jul 22, 2024
3baf859
feat: sending data to datalake
TanookiVerde Jul 22, 2024
b21abf7
Merge branch 'main' into send_data_to_datalake
TanookiVerde Jul 22, 2024
887c758
fix: parquet wrong casting to timestamp; implement new unnesters
TanookiVerde Jul 22, 2024
0adc858
fix(lint): unused imports
TanookiVerde Jul 22, 2024
d1b1b40
chore: Add option to upload data to datalake in create_raw_data function
TanookiVerde Jul 22, 2024
24b0740
fix: tests don't send to datalake
TanookiVerde Jul 22, 2024
3b56a76
Merge pull request #153 from prefeitura-rio/send_data_to_datalake
TanookiVerde Jul 22, 2024
c224639
fix: prepare gcp credentials
TanookiVerde Jul 22, 2024
a280e1c
Merge pull request #154 from prefeitura-rio/send_data_to_datalake
TanookiVerde Jul 22, 2024
0ee63e3
fix: config of basedosdados
TanookiVerde Jul 22, 2024
427a355
Merge pull request #155 from prefeitura-rio/send_data_to_datalake
TanookiVerde Jul 22, 2024
7b2bbd7
fix: Copy payloads before iterating in unnester functions
TanookiVerde Jul 22, 2024
2df25a3
Merge pull request #156 from prefeitura-rio/send_data_to_datalake
TanookiVerde Jul 22, 2024
5b9da0c
feat: rename class + upload folder not files
TanookiVerde Jul 24, 2024
336489c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 24, 2024
fd5f41e
feat: remove unused import in datalake.py
TanookiVerde Jul 24, 2024
b7b493c
Merge pull request #158 from prefeitura-rio/feat/improved-datalake-up…
TanookiVerde Jul 24, 2024
7cf6b35
Refactor Datalake Sending logic
TanookiVerde Jul 25, 2024
68b60a2
feat: implementing encounter formatters
TanookiVerde Jul 25, 2024
7e254e9
chore: automatically reformat file
TanookiVerde Jul 25, 2024
310e678
chore: quick docs in datalake files
TanookiVerde Jul 25, 2024
bb59558
feat: rollback of upload_to_datalake option
TanookiVerde Jul 25, 2024
69c4929
feat: Update cod_ciap2 field to be optional in VitacareCondicao model
TanookiVerde Jul 25, 2024
43f6331
feat: Refactor Datalake Sending logic and improve error handling
TanookiVerde Jul 25, 2024
0e863aa
feat: Fix get_formatter function in datalake/utils.py
TanookiVerde Jul 25, 2024
9f23398
Fix get_formatter function in datalake/utils.py
TanookiVerde Jul 25, 2024
bbceae3
feat: compatibility with historic vitacare data
TanookiVerde Jul 25, 2024
53f8e7d
Merge pull request #159 from prefeitura-rio/feat/improved-datalake-up…
TanookiVerde Jul 25, 2024
bb0a13f
fix: improvements and general fixes
TanookiVerde Jul 26, 2024
76d30ba
feat: Add error handling and cleanup logic to DatalakeUploader
TanookiVerde Jul 26, 2024
c86422d
feat: Improve cleanup logic in DatalakeUploader
TanookiVerde Jul 26, 2024
a6ac1eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 26, 2024
f77ba96
Merge pull request #160 from prefeitura-rio/feat/improved-datalake-up…
TanookiVerde Jul 26, 2024
93b0491
fix(lint): minor lint issues
TanookiVerde Jul 26, 2024
0b5bc34
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 26, 2024
5598367
Merge pull request #161 from prefeitura-rio/feat/improved-datalake-up…
TanookiVerde Jul 26, 2024
e5140f4
fix: folders outside "app" are not available in container
TanookiVerde Jul 26, 2024
38ffceb
Merge pull request #162 from prefeitura-rio/feat/improved-datalake-up…
TanookiVerde Jul 26, 2024
c50da07
feat: Add VitacareProcedimentosClinicos model and update formatters
TanookiVerde Jul 26, 2024
2ce258f
feat: Enable upload to datalake in create_raw_data endpoint
TanookiVerde Jul 26, 2024
bb54dd8
Merge pull request #163 from prefeitura-rio/feat/updating-vitacare-en…
TanookiVerde Jul 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/app/datalake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
# pylint: disable=W0401, W0614, W0611
# flake8: noqa: F401, F403
from app.datalake.utils import register_formatter
from app.datalake.formatters import *
97 changes: 97 additions & 0 deletions api/app/datalake/formatters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
# =============================================
# Formatters that are responsible for converting
# raw JSON records to Datalake table rows.
# =============================================
from typing import List
from app.datalake.utils import flatten, register_formatter
from app.datalake.models import (
SMSRioCnsProvisorio,
SMSRioPaciente,
SMSRioTelefone,
VitacarePaciente,
VitacarePacienteHistorico,
VitacareAtendimento,
VitacareCondicao,
VitacareAlergia,
VitacareEncaminhamento,
VitacareExameSolicitado,
VitacareIndicador,
VitacarePrescricao,
VitacareVacina,
VitacareProcedimentosClinicos
)


@register_formatter(system="smsrio", entity="patientrecords")
def format_smsrio_patient(raw_record: dict) -> List:
raw_record['source_updated_at'] = str(raw_record['source_updated_at'])

flattened_patient = flatten(raw_record)
rows = [SMSRioPaciente(**flattened_patient)]

for field_name, FieldModel in [
('telefones', SMSRioTelefone),
('cns_provisorio', SMSRioCnsProvisorio)
]:
# If field not in record, skip
if field_name not in raw_record['data']:
continue

for value in raw_record['data'].pop(field_name) or []:
rows.append(
FieldModel(
value=value,
patient_cpf=raw_record.get("patient_cpf"),
source_updated_at=raw_record.get("source_updated_at")
)
)

return rows


@register_formatter(system="vitacare", entity="patientrecords")
def format_vitacare_patient(raw_record: dict) -> List:
raw_record['source_updated_at'] = str(raw_record['source_updated_at'])

flattened = flatten(raw_record, list_max_depth=0)

# Temporary criterium to discriminate between Routine and Historic format
if 'AP' in raw_record['data'].keys():
return [VitacarePacienteHistorico(**flattened)]
else:
return [VitacarePaciente(**flattened)]


@register_formatter(system="vitacare", entity="encounter")
def format_vitacare_encounter(raw_record: dict) -> List:
raw_record['source_updated_at'] = str(raw_record['source_updated_at'])

flattened = flatten(raw_record,dict_max_depth=3)

rows = [VitacareAtendimento(**flattened)]

for field_name, FieldModel in [
('condicoes', VitacareCondicao),
('alergias_anamnese', VitacareAlergia),
('encaminhamentos', VitacareEncaminhamento),
('exames_solicitados', VitacareExameSolicitado),
('indicadores', VitacareIndicador),
('prescricoes', VitacarePrescricao),
('vacinas', VitacareVacina),
('procedimentosClinicos', VitacareProcedimentosClinicos),
]:
if field_name not in raw_record['data']:
continue

for fields in raw_record['data'].pop(field_name) or []:
rows.append(
FieldModel(
patient_cpf=raw_record.get("patient_cpf"),
atendimento_id=raw_record.get("source_id"),
source_updated_at=raw_record.get("source_updated_at"),
**fields
)
)

return rows
Loading
Loading