Skip to content

Commit

Permalink
feat: sending data to datalake
Browse files Browse the repository at this point in the history
  • Loading branch information
TanookiVerde committed Jul 22, 2024
1 parent 6c638d0 commit 3baf859
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 117 deletions.
186 changes: 186 additions & 0 deletions api/app/datalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# -*- coding: utf-8 -*-
import os
import uuid
import glob
from typing import Optional

import pandas as pd
import basedosdados as bd

from loguru import logger


class DataLakeUploader:

def __init__(
self,
if_exists: str = "append",
if_storage_data_exists: str = "replace",
biglake_table: bool = True,
dataset_is_public: bool = False,
dump_mode: str = "append",
csv_delimiter: str = ";",
force_unique_file_name: bool = False,
) -> None:
self.if_exists = if_exists
self.if_storage_data_exists = if_storage_data_exists
self.biglake_table = biglake_table
self.dataset_is_public = dataset_is_public
self.dump_mode = dump_mode
self.csv_delimiter = csv_delimiter
self.force_unique_file_name = force_unique_file_name

self._base_path = os.path.join(os.getcwd(), "/files")

def _split_dataframe_per_day(
self,
df: pd.DataFrame,
date_column: str,
) -> list[tuple[pd.Timestamp, pd.DataFrame]]:
now = pd.Timestamp.now(tz="America/Sao_Paulo")

if df.empty:
logger.warning("Empty dataframe. Preparing to send file with only headers")
dfs = [(str(now.date()), df)]
else:
logger.warning("Non Empty dataframe. Splitting Dataframe in multiple files by day")
df["partition_date"] = pd.to_datetime(df[date_column]).dt.date
days = df["partition_date"].unique()
dfs = [
(
pd.Timestamp(day),
df[df["partition_date"] == day].drop(columns=["partition_date"]),
)
for day in days
] # noqa

return dfs

def _create_file_name(
self,
table_id: str,
unique: bool = False
) -> str:
if unique:
return f"{table_id}-{uuid.uuid4()}.parquet"
else:
return f"{table_id}.parquet"

def _upload_file(
self,
input_path: str,
dataset_id: str,
table_id: str,
source_format: str = "parquet"
) -> None:
tb = bd.Table(dataset_id=dataset_id, table_id=table_id)
table_staging = f"{tb.table_full_name['staging']}"

st = bd.Storage(dataset_id=dataset_id, table_id=table_id)
storage_path = f"{st.bucket_name}.staging.{dataset_id}.{table_id}"
storage_path_link = (
f"https://console.cloud.google.com/storage/browser/{st.bucket_name}"
f"/staging/{dataset_id}/{table_id}"
)

try:
table_exists = tb.table_exists(mode="staging")

if not table_exists:
logger.info(f"CREATING TABLE: {dataset_id}.{table_id}")
tb.create(
path=input_path,
source_format=source_format,
csv_delimiter=self.csv_delimiter,
if_storage_data_exists=self.if_storage_data_exists,
biglake_table=self.biglake_table,
dataset_is_public=self.dataset_is_public,
)
else:
if self.dump_mode == "append":
logger.info(
f"TABLE ALREADY EXISTS APPENDING DATA TO STORAGE: {dataset_id}.{table_id}"
)

tb.append(filepath=input_path, if_exists=self.if_exists)
elif self.dump_mode == "overwrite":
logger.info(
"MODE OVERWRITE: Table ALREADY EXISTS, DELETING OLD DATA!\n"
f"{storage_path}\n"
f"{storage_path_link}"
) # pylint: disable=C0301
st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True)
logger.info(
"MODE OVERWRITE: Sucessfully DELETED OLD DATA from Storage:\n"
f"{storage_path}\n"
f"{storage_path_link}"
) # pylint: disable=C0301
tb.delete(mode="all")
logger.info(
"MODE OVERWRITE: Sucessfully DELETED TABLE:\n" f"{table_staging}\n"
) # pylint: disable=C0301

tb.create(
path=input_path,
source_format=source_format,
csv_delimiter=self.csv_delimiter,
if_storage_data_exists=self.if_storage_data_exists,
biglake_table=self.biglake_table,
dataset_is_public=self.dataset_is_public,
)
logger.info("Data uploaded to BigQuery")

except Exception as e: # pylint: disable=W0703
logger.error(f"An error occurred: {e}", level="error")
raise RuntimeError() from e

def upload(
self,
dataframe: pd.DataFrame,
dataset_id: str,
table_id: str,
partition_by_date: bool = False,
partition_column: Optional[str] = None,
**kwargs
) -> None:
upload_id = uuid.uuid4()
upload_folder = os.path.join(self._base_path, str(upload_id))

# Override values in self with kwargs
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
else:
raise ValueError(f"Invalid parameter: {key}")

if partition_by_date:
if partition_column is None:
raise ValueError("partition_column must be provided when partition_by_date is True")

for partition_date, dataframe in self._split_dataframe_per_day(
dataframe, date_column=partition_column
):
year = int(partition_date.strftime("%Y"))
month = int(partition_date.strftime("%m"))
day = partition_date.strftime("%Y-%m-%d")

partition_folder = f"ano_particao={year}/mes_particao={month}/data_particao={day}"

folder_path = os.path.join(upload_folder, partition_folder)
os.makedirs(folder_path, exist_ok=True)

dataframe.to_parquet(
os.path.join(
folder_path, self._create_file_name(table_id, self.force_unique_file_name)
)
)
else:
dataframe.to_parquet(
os.path.join(
upload_folder, self._create_file_name(table_id, self.force_unique_file_name)
)
)

for file in glob.glob(f"{upload_folder}/**/*.parquet", recursive=True):
self._upload_file(file, dataset_id, table_id)
os.remove(file)
77 changes: 57 additions & 20 deletions api/app/routers/entities_raw.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,49 @@
# -*- coding: utf-8 -*-
import asyncpg
import pandas as pd

from datetime import (
datetime as dt,
timedelta as td,
)
from typing import Annotated, Literal
import asyncpg
from fastapi import APIRouter, Depends
from fastapi.responses import HTMLResponse
from tortoise.exceptions import ValidationError

from app.pydantic_models import (
RawDataListModel,
BulkInsertOutputModel,
RawDataModel
)
from app.pydantic_models import RawDataListModel, BulkInsertOutputModel, RawDataModel
from app.dependencies import get_current_active_user
from app.models import (
User,
RawPatientRecord,
RawPatientCondition,
DataSource,
RawEncounter
from app.models import User, RawPatientRecord, RawPatientCondition, DataSource, RawEncounter
from app.enums import SystemEnum
from app.datalake import DataLakeUploader
from app.utils import (
unnester_encounter,
unnester_patientconditions,
unnester_patientrecords
)


router = APIRouter(prefix="/raw", tags=["Entidades RAW (Formato Raw/Bruto)"])

entity_from_name = {
"patientrecords": RawPatientRecord,
"patientconditions": RawPatientCondition,
"encounter": RawEncounter
entities_config = {
"patientrecords": {
"class": RawPatientRecord,
"unnester": unnester_patientrecords,
},
"patientconditions": {
"class": RawPatientCondition,
"unnester": unnester_patientconditions,
},
"encounter": {
"class": RawEncounter,
"unnester": unnester_encounter,
},
}

datalake_config = {
SystemEnum.VITACARE: "brutos_prontuario_vitacare",
SystemEnum.VITAI: "brutos_prontuario_vitai",
SystemEnum.SMSRIO: "brutos_plataforma_smsrio",
}


Expand All @@ -43,7 +57,7 @@ async def get_raw_data(
datasource_system: Literal["vitai", "vitacare", "smsrio"] = None,
) -> list[RawDataModel]:

Entity = entity_from_name.get(entity_name)
Entity = entities_config[entity_name]["class"]

if filter_type == "fromEventDatetime":
filtered = Entity.filter(
Expand Down Expand Up @@ -74,12 +88,35 @@ async def create_raw_data(
raw_data: RawDataListModel,
) -> BulkInsertOutputModel:

Entity = entity_from_name.get(entity_name)
Entity = entities_config[entity_name]["class"]
unnester = entities_config[entity_name]["unnester"]

raw_data = raw_data.dict()
cnes = raw_data.pop("cnes")
records = raw_data.pop("data_list")

# Get DataSource
data_source = await DataSource.get(cnes=cnes)

# Configure Datalake Uploader
uploader = DataLakeUploader(
biglake_table=True,
dataset_is_public=False,
dump_mode="append",
force_unique_file_name=True,
)

# Upload to Datalake
for name, dataframe in unnester(records):
uploader.upload(
dataframe=dataframe,
dataset_id=datalake_config[data_source.system],
table_id=f"{name}_eventos",
partition_by_date=True,
partition_column="updated_at",
)

# Send to HCI Database
try:
records_to_create = []
for record in records:
Expand Down Expand Up @@ -109,5 +146,5 @@ async def set_as_invalid_flag_records(
entity_name: Literal["patientrecords", "patientconditions", "encounter"],
raw_record_id_list: list[str],
):
Entity = entity_from_name.get(entity_name)
await Entity.filter(id__in=raw_record_id_list).update(is_valid=False)
Entity = entities_config[entity_name]["class"]
await Entity.filter(id__in=raw_record_id_list).update(is_valid=False)
Loading

0 comments on commit 3baf859

Please sign in to comment.