Skip to content

Commit

Permalink
add procrastinate to generate parquets
Browse files Browse the repository at this point in the history
  • Loading branch information
nicokant committed May 28, 2024
1 parent 749a3b4 commit 2eb3d3c
Show file tree
Hide file tree
Showing 9 changed files with 849 additions and 25 deletions.
578 changes: 578 additions & 0 deletions db/schema.sql

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,34 @@ services:
PGRST_JWT_SECRET: ${JWT_SECRET}
PGRST_DB_ANON_ROLE: web_anon
wizard:
image: seapop_wizard
build:
context: wizard
environment:
POSTGREST_URL: http://postgrest:3000
POSTGREST_TOKEN: ${WRITER_TOKEN}
LOGGING: DEBUG
TEST_DATA_PATH: /test_data
DATABASE_URL: postgres://postgres:${POSTGRES_PASSWORD}@postgres:5432/postgres?sslmode=disable
tty: true
stdin_open: true
volumes:
- ./loggers_data:/app/loggers_data
- ./data:/data/
- ./wizard/parsers:/app/parsers
- ./test_data:/test_data
queue:
image: seapop_wizard
command: pdm run procrastinate --app=tasks.app worker
environment:
POSTGREST_URL: http://postgrest:3000
POSTGREST_TOKEN: ${WRITER_TOKEN}
LOGGING: DEBUG
TEST_DATA_PATH: /test_data
DATABASE_URL: postgres://postgres:${POSTGRES_PASSWORD}@postgres:5432/postgres?sslmode=disable
volumes:
- ./data:/data/
- ./wizard/parsers:/app/parsers
pgweb:
image: sosedoff/pgweb:latest
environment:
Expand Down
3 changes: 2 additions & 1 deletion wizard/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ COPY pyproject.toml pdm.lock .
RUN --mount=type=cache,target=/root/.cache/pdm \
pdm install --no-self

COPY wizard.py parser.py .
COPY wizard.py parser.py tasks.py entrypoint.sh .
COPY parsers/ parsers/
COPY templates/ templates/

EXPOSE 8000/TCP
ENTRYPOINT [ "./entrypoint.sh" ]
CMD ["pdm", "run", "./wizard.py"]
8 changes: 8 additions & 0 deletions wizard/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

set -ex

export PYTHONPATH=.
pdm run procrastinate --app=tasks.app schema --apply || true

exec "$@"
6 changes: 2 additions & 4 deletions wizard/parsers/parser_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,10 @@ def as_table(self) -> pa.Table:
def write_parquet(self, path: pathlib.Path, filename: str = None):
if filename:
filename = pathlib.Path(filename)
if not filename and self.stream.name:
filename = pathlib.Path(self.stream.name)
else:
raise Exception("Stream has no name and no filename is provided")
filename = self.file._file_path.name

pq.write_table(self.as_table(), str(path / f'{filename.stem}.parquet'))
pq.write_table(self.as_table(), str(path / f'{filename}.parquet'))

def write_csv(self, path):
pacsv.write_csv(self.as_table(), str(path))
Expand Down
187 changes: 185 additions & 2 deletions wizard/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions wizard/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies = [
"pandas>=2.0.3",
"chardet>=5.2.0",
"sentry-sdk>=1.40.5",
"procrastinate",
]
requires-python = ">=3.8"
license = {text = "GPLv3"}
Expand Down
37 changes: 37 additions & 0 deletions wizard/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os
import pathlib
from procrastinate import App, PsycopgConnector

from parsers.parser import detect_file

DATABASE_URL = os.getenv("DATABASE_URL")

DATA_PATH = pathlib.Path(os.getenv("DATA_PATH", '/data/'))

LOGGERS_PATH = DATA_PATH / 'loggers'
SPREADSHEETS_PATH = DATA_PATH / 'metadata'
PARQUET_PATH = DATA_PATH / 'parquet'

if not DATABASE_URL:
raise Exception('Missing DATABASE_URL')

app = App(
connector=PsycopgConnector(
conninfo=DATABASE_URL
)
)


@app.task(name='to_parquet')
def to_parquet(file_path: str):
parser = detect_file(path=pathlib.Path(file_path))
parser.write_parquet(PARQUET_PATH)


@app.periodic(cron="* * * * *")
@app.task(name='check_missing')
def check_missing(timestamp: int):
for f in LOGGERS_PATH.iterdir():
if not (PARQUET_PATH / (f.name + '.parquet')).exists():
print(f.name + ' not found, adding a task to generate it')
to_parquet.configure(lock=f.name).defer(file_path=str(f))
Loading

0 comments on commit 2eb3d3c

Please sign in to comment.