diff --git a/db/schema.sql b/db/schema.sql index 58da51e..139c9da 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -93,6 +93,33 @@ CREATE EXTENSION IF NOT EXISTS postgis_topology WITH SCHEMA topology; COMMENT ON EXTENSION postgis_topology IS 'PostGIS topology spatial types and functions'; +-- +-- Name: procrastinate_job_event_type; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.procrastinate_job_event_type AS ENUM ( + 'deferred', + 'started', + 'deferred_for_retry', + 'failed', + 'succeeded', + 'cancelled', + 'scheduled' +); + + +-- +-- Name: procrastinate_job_status; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.procrastinate_job_status AS ENUM ( + 'todo', + 'doing', + 'succeeded', + 'failed' +); + + -- -- Name: import(); Type: FUNCTION; Schema: public; Owner: - -- @@ -583,6 +610,329 @@ end; $$; +-- +-- Name: procrastinate_defer_job(character varying, character varying, text, text, jsonb, timestamp with time zone); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_defer_job(queue_name character varying, task_name character varying, lock text, queueing_lock text, args jsonb, scheduled_at timestamp with time zone) RETURNS bigint + LANGUAGE plpgsql + AS $$ +DECLARE + job_id bigint; +BEGIN + INSERT INTO procrastinate_jobs (queue_name, task_name, lock, queueing_lock, args, scheduled_at) + VALUES (queue_name, task_name, lock, queueing_lock, args, scheduled_at) + RETURNING id INTO job_id; + + RETURN job_id; +END; +$$; + + +-- +-- Name: procrastinate_defer_periodic_job(character varying, character varying, character varying, character varying, character varying, bigint, jsonb); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_defer_periodic_job(_queue_name character varying, _lock character varying, _queueing_lock character varying, _task_name character varying, _periodic_id character varying, _defer_timestamp bigint, _args jsonb) RETURNS bigint + LANGUAGE plpgsql + AS $$ +DECLARE + _job_id bigint; + _defer_id bigint; +BEGIN + + INSERT + INTO procrastinate_periodic_defers (task_name, periodic_id, defer_timestamp) + VALUES (_task_name, _periodic_id, _defer_timestamp) + ON CONFLICT DO NOTHING + RETURNING id into _defer_id; + + IF _defer_id IS NULL THEN + RETURN NULL; + END IF; + + UPDATE procrastinate_periodic_defers + SET job_id = procrastinate_defer_job( + _queue_name, + _task_name, + _lock, + _queueing_lock, + _args, + NULL + ) + WHERE id = _defer_id + RETURNING job_id INTO _job_id; + + DELETE + FROM procrastinate_periodic_defers + USING ( + SELECT id + FROM procrastinate_periodic_defers + WHERE procrastinate_periodic_defers.task_name = _task_name + AND procrastinate_periodic_defers.periodic_id = _periodic_id + AND procrastinate_periodic_defers.defer_timestamp < _defer_timestamp + ORDER BY id + FOR UPDATE + ) to_delete + WHERE procrastinate_periodic_defers.id = to_delete.id; + + RETURN _job_id; +END; +$$; + + +-- +-- Name: procrastinate_jobs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.procrastinate_jobs ( + id bigint NOT NULL, + queue_name character varying(128) NOT NULL, + task_name character varying(128) NOT NULL, + lock text, + queueing_lock text, + args jsonb DEFAULT '{}'::jsonb NOT NULL, + status public.procrastinate_job_status DEFAULT 'todo'::public.procrastinate_job_status NOT NULL, + scheduled_at timestamp with time zone, + attempts integer DEFAULT 0 NOT NULL +); + + +-- +-- Name: procrastinate_fetch_job(character varying[]); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_fetch_job(target_queue_names character varying[]) RETURNS public.procrastinate_jobs + LANGUAGE plpgsql + AS $$ +DECLARE + found_jobs procrastinate_jobs; +BEGIN + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + WHERE + -- reject the job if its lock has earlier jobs + NOT EXISTS ( + SELECT 1 + FROM procrastinate_jobs AS earlier_jobs + WHERE + jobs.lock IS NOT NULL + AND earlier_jobs.lock = jobs.lock + AND earlier_jobs.status IN ('todo', 'doing') + AND earlier_jobs.id < jobs.id) + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + + RETURN found_jobs; +END; +$$; + + +-- +-- Name: procrastinate_finish_job(bigint, public.procrastinate_job_status, boolean); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_finish_job(job_id bigint, end_status public.procrastinate_job_status, delete_job boolean) RETURNS void + LANGUAGE plpgsql + AS $$ +DECLARE + _job_id bigint; +BEGIN + IF end_status NOT IN ('succeeded', 'failed') THEN + RAISE 'End status should be either "succeeded" or "failed" (job id: %)', job_id; + END IF; + IF delete_job THEN + DELETE FROM procrastinate_jobs + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = end_status, + attempts = + CASE + WHEN status = 'doing' THEN attempts + 1 + ELSE attempts + END + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "doing" or "todo" status (job id: %)', job_id; + END IF; +END; +$$; + + +-- +-- Name: procrastinate_finish_job(integer, public.procrastinate_job_status, timestamp with time zone, boolean); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_finish_job(job_id integer, end_status public.procrastinate_job_status, next_scheduled_at timestamp with time zone, delete_job boolean) RETURNS void + LANGUAGE plpgsql + AS $$ +DECLARE + _job_id bigint; +BEGIN + IF end_status NOT IN ('succeeded', 'failed') THEN + RAISE 'End status should be either "succeeded" or "failed" (job id: %)', job_id; + END IF; + IF delete_job THEN + DELETE FROM procrastinate_jobs + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = end_status, + attempts = + CASE + WHEN status = 'doing' THEN attempts + 1 + ELSE attempts + END + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "doing" or "todo" status (job id: %)', job_id; + END IF; +END; +$$; + + +-- +-- Name: procrastinate_notify_queue(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_notify_queue() RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN + PERFORM pg_notify('procrastinate_queue#' || NEW.queue_name, NEW.task_name); + PERFORM pg_notify('procrastinate_any_queue', NEW.task_name); + RETURN NEW; +END; +$$; + + +-- +-- Name: procrastinate_retry_job(bigint, timestamp with time zone); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_retry_job(job_id bigint, retry_at timestamp with time zone) RETURNS void + LANGUAGE plpgsql + AS $$ +DECLARE + _job_id bigint; +BEGIN + UPDATE procrastinate_jobs + SET status = 'todo', + attempts = attempts + 1, + scheduled_at = retry_at + WHERE id = job_id AND status = 'doing' + RETURNING id INTO _job_id; + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "doing" status (job id: %)', job_id; + END IF; +END; +$$; + + +-- +-- Name: procrastinate_trigger_scheduled_events_procedure(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_trigger_scheduled_events_procedure() RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN + INSERT INTO procrastinate_events(job_id, type, at) + VALUES (NEW.id, 'scheduled'::procrastinate_job_event_type, NEW.scheduled_at); + + RETURN NEW; +END; +$$; + + +-- +-- Name: procrastinate_trigger_status_events_procedure_insert(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_trigger_status_events_procedure_insert() RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN + INSERT INTO procrastinate_events(job_id, type) + VALUES (NEW.id, 'deferred'::procrastinate_job_event_type); + RETURN NEW; +END; +$$; + + +-- +-- Name: procrastinate_trigger_status_events_procedure_update(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_trigger_status_events_procedure_update() RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN + WITH t AS ( + SELECT CASE + WHEN OLD.status = 'todo'::procrastinate_job_status + AND NEW.status = 'doing'::procrastinate_job_status + THEN 'started'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'todo'::procrastinate_job_status + THEN 'deferred_for_retry'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'failed'::procrastinate_job_status + THEN 'failed'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'succeeded'::procrastinate_job_status + THEN 'succeeded'::procrastinate_job_event_type + WHEN OLD.status = 'todo'::procrastinate_job_status + AND ( + NEW.status = 'failed'::procrastinate_job_status + OR NEW.status = 'succeeded'::procrastinate_job_status + ) + THEN 'cancelled'::procrastinate_job_event_type + ELSE NULL + END as event_type + ) + INSERT INTO procrastinate_events(job_id, type) + SELECT NEW.id, t.event_type + FROM t + WHERE t.event_type IS NOT NULL; + RETURN NEW; +END; +$$; + + +-- +-- Name: procrastinate_unlink_periodic_defers(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.procrastinate_unlink_periodic_defers() RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE procrastinate_periodic_defers + SET job_id = NULL + WHERE job_id = OLD.id; + RETURN OLD; +END; +$$; + + -- -- Name: safe_cast_bool(text); Type: FUNCTION; Schema: public; Owner: - -- @@ -885,6 +1235,88 @@ ALTER TABLE public.logger_instrumentation ALTER COLUMN id ADD GENERATED ALWAYS A ); +-- +-- Name: procrastinate_events; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.procrastinate_events ( + id bigint NOT NULL, + job_id bigint NOT NULL, + type public.procrastinate_job_event_type, + at timestamp with time zone DEFAULT now() +); + + +-- +-- Name: procrastinate_events_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.procrastinate_events_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: procrastinate_events_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.procrastinate_events_id_seq OWNED BY public.procrastinate_events.id; + + +-- +-- Name: procrastinate_jobs_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.procrastinate_jobs_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: procrastinate_jobs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.procrastinate_jobs_id_seq OWNED BY public.procrastinate_jobs.id; + + +-- +-- Name: procrastinate_periodic_defers; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.procrastinate_periodic_defers ( + id bigint NOT NULL, + task_name character varying(128) NOT NULL, + defer_timestamp bigint, + job_id bigint, + periodic_id character varying(128) DEFAULT ''::character varying NOT NULL +); + + +-- +-- Name: procrastinate_periodic_defers_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.procrastinate_periodic_defers_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: procrastinate_periodic_defers_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.procrastinate_periodic_defers_id_seq OWNED BY public.procrastinate_periodic_defers.id; + + -- -- Name: schema_migrations; Type: TABLE; Schema: public; Owner: - -- @@ -894,6 +1326,27 @@ CREATE TABLE public.schema_migrations ( ); +-- +-- Name: procrastinate_events id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.procrastinate_events ALTER COLUMN id SET DEFAULT nextval('public.procrastinate_events_id_seq'::regclass); + + +-- +-- Name: procrastinate_jobs id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.procrastinate_jobs ALTER COLUMN id SET DEFAULT nextval('public.procrastinate_jobs_id_seq'::regclass); + + +-- +-- Name: procrastinate_periodic_defers id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.procrastinate_periodic_defers ALTER COLUMN id SET DEFAULT nextval('public.procrastinate_periodic_defers_id_seq'::regclass); + + -- -- Name: animal animal_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -958,6 +1411,38 @@ ALTER TABLE ONLY public.logger ADD CONSTRAINT logger_pkey PRIMARY KEY (id); +-- +-- Name: procrastinate_events procrastinate_events_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.procrastinate_events + ADD CONSTRAINT procrastinate_events_pkey PRIMARY KEY (id); + + +-- +-- Name: procrastinate_jobs procrastinate_jobs_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.procrastinate_jobs + ADD CONSTRAINT procrastinate_jobs_pkey PRIMARY KEY (id); + + +-- +-- Name: procrastinate_periodic_defers procrastinate_periodic_defers_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.procrastinate_periodic_defers + ADD CONSTRAINT procrastinate_periodic_defers_pkey PRIMARY KEY (id); + + +-- +-- Name: procrastinate_periodic_defers procrastinate_periodic_defers_unique; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.procrastinate_periodic_defers + ADD CONSTRAINT procrastinate_periodic_defers_unique UNIQUE (task_name, periodic_id, defer_timestamp); + + -- -- Name: ring ring_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -974,6 +1459,48 @@ ALTER TABLE ONLY public.schema_migrations ADD CONSTRAINT schema_migrations_pkey PRIMARY KEY (version); +-- +-- Name: procrastinate_events_job_id_fkey; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX procrastinate_events_job_id_fkey ON public.procrastinate_events USING btree (job_id); + + +-- +-- Name: procrastinate_jobs_id_lock_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX procrastinate_jobs_id_lock_idx ON public.procrastinate_jobs USING btree (id, lock) WHERE (status = ANY (ARRAY['todo'::public.procrastinate_job_status, 'doing'::public.procrastinate_job_status])); + + +-- +-- Name: procrastinate_jobs_lock_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON public.procrastinate_jobs USING btree (lock) WHERE (status = 'doing'::public.procrastinate_job_status); + + +-- +-- Name: procrastinate_jobs_queue_name_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX procrastinate_jobs_queue_name_idx ON public.procrastinate_jobs USING btree (queue_name); + + +-- +-- Name: procrastinate_jobs_queueing_lock_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON public.procrastinate_jobs USING btree (queueing_lock) WHERE (status = 'todo'::public.procrastinate_job_status); + + +-- +-- Name: procrastinate_periodic_defers_job_id_fkey; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX procrastinate_periodic_defers_job_id_fkey ON public.procrastinate_periodic_defers USING btree (job_id); + + -- -- Name: import import; Type: TRIGGER; Schema: public; Owner: - -- @@ -981,6 +1508,41 @@ ALTER TABLE ONLY public.schema_migrations CREATE TRIGGER import BEFORE INSERT ON public.import FOR EACH ROW EXECUTE FUNCTION public.import(); +-- +-- Name: procrastinate_jobs procrastinate_jobs_notify_queue; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER procrastinate_jobs_notify_queue AFTER INSERT ON public.procrastinate_jobs FOR EACH ROW WHEN ((new.status = 'todo'::public.procrastinate_job_status)) EXECUTE FUNCTION public.procrastinate_notify_queue(); + + +-- +-- Name: procrastinate_jobs procrastinate_trigger_delete_jobs; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER procrastinate_trigger_delete_jobs BEFORE DELETE ON public.procrastinate_jobs FOR EACH ROW EXECUTE FUNCTION public.procrastinate_unlink_periodic_defers(); + + +-- +-- Name: procrastinate_jobs procrastinate_trigger_scheduled_events; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER procrastinate_trigger_scheduled_events AFTER INSERT OR UPDATE ON public.procrastinate_jobs FOR EACH ROW WHEN (((new.scheduled_at IS NOT NULL) AND (new.status = 'todo'::public.procrastinate_job_status))) EXECUTE FUNCTION public.procrastinate_trigger_scheduled_events_procedure(); + + +-- +-- Name: procrastinate_jobs procrastinate_trigger_status_events_insert; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER procrastinate_trigger_status_events_insert AFTER INSERT ON public.procrastinate_jobs FOR EACH ROW WHEN ((new.status = 'todo'::public.procrastinate_job_status)) EXECUTE FUNCTION public.procrastinate_trigger_status_events_procedure_insert(); + + +-- +-- Name: procrastinate_jobs procrastinate_trigger_status_events_update; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER procrastinate_trigger_status_events_update AFTER UPDATE OF status ON public.procrastinate_jobs FOR EACH ROW EXECUTE FUNCTION public.procrastinate_trigger_status_events_procedure_update(); + + -- -- Name: chick chick_deployment_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -1021,6 +1583,22 @@ ALTER TABLE ONLY public.logger_instrumentation ADD CONSTRAINT logger_instrumentation_ring_fkey FOREIGN KEY (ring) REFERENCES public.ring(id); +-- +-- Name: procrastinate_events procrastinate_events_job_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.procrastinate_events + ADD CONSTRAINT procrastinate_events_job_id_fkey FOREIGN KEY (job_id) REFERENCES public.procrastinate_jobs(id) ON DELETE CASCADE; + + +-- +-- Name: procrastinate_periodic_defers procrastinate_periodic_defers_job_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.procrastinate_periodic_defers + ADD CONSTRAINT procrastinate_periodic_defers_job_id_fkey FOREIGN KEY (job_id) REFERENCES public.procrastinate_jobs(id); + + -- -- Name: ring ring_animal_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- diff --git a/docker-compose.yml b/docker-compose.yml index c84f96e..94731c1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,6 +23,7 @@ services: PGRST_JWT_SECRET: ${JWT_SECRET} PGRST_DB_ANON_ROLE: web_anon wizard: + image: seapop_wizard build: context: wizard environment: @@ -30,6 +31,7 @@ services: POSTGREST_TOKEN: ${WRITER_TOKEN} LOGGING: DEBUG TEST_DATA_PATH: /test_data + DATABASE_URL: postgres://postgres:${POSTGRES_PASSWORD}@postgres:5432/postgres?sslmode=disable tty: true stdin_open: true volumes: @@ -37,6 +39,18 @@ services: - ./data:/data/ - ./wizard/parsers:/app/parsers - ./test_data:/test_data + queue: + image: seapop_wizard + command: pdm run procrastinate --app=tasks.app worker + environment: + POSTGREST_URL: http://postgrest:3000 + POSTGREST_TOKEN: ${WRITER_TOKEN} + LOGGING: DEBUG + TEST_DATA_PATH: /test_data + DATABASE_URL: postgres://postgres:${POSTGRES_PASSWORD}@postgres:5432/postgres?sslmode=disable + volumes: + - ./data:/data/ + - ./wizard/parsers:/app/parsers pgweb: image: sosedoff/pgweb:latest environment: diff --git a/wizard/Dockerfile b/wizard/Dockerfile index 5bd8a1d..cbefb62 100644 --- a/wizard/Dockerfile +++ b/wizard/Dockerfile @@ -9,9 +9,10 @@ COPY pyproject.toml pdm.lock . RUN --mount=type=cache,target=/root/.cache/pdm \ pdm install --no-self -COPY wizard.py parser.py . +COPY wizard.py parser.py tasks.py entrypoint.sh . COPY parsers/ parsers/ COPY templates/ templates/ EXPOSE 8000/TCP +ENTRYPOINT [ "./entrypoint.sh" ] CMD ["pdm", "run", "./wizard.py"] diff --git a/wizard/entrypoint.sh b/wizard/entrypoint.sh new file mode 100755 index 0000000..70acdf2 --- /dev/null +++ b/wizard/entrypoint.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +set -ex + +export PYTHONPATH=. +pdm run procrastinate --app=tasks.app schema --apply || true + +exec "$@" diff --git a/wizard/parsers/parser_base.py b/wizard/parsers/parser_base.py index 6e4a432..8d743df 100644 --- a/wizard/parsers/parser_base.py +++ b/wizard/parsers/parser_base.py @@ -95,12 +95,10 @@ def as_table(self) -> pa.Table: def write_parquet(self, path: pathlib.Path, filename: str = None): if filename: filename = pathlib.Path(filename) - if not filename and self.stream.name: - filename = pathlib.Path(self.stream.name) else: - raise Exception("Stream has no name and no filename is provided") + filename = self.file._file_path.name - pq.write_table(self.as_table(), str(path / f'{filename.stem}.parquet')) + pq.write_table(self.as_table(), str(path / f'{filename}.parquet')) def write_csv(self, path): pacsv.write_csv(self.as_table(), str(path)) diff --git a/wizard/pdm.lock b/wizard/pdm.lock index e79d5e2..66fc174 100644 --- a/wizard/pdm.lock +++ b/wizard/pdm.lock @@ -4,8 +4,61 @@ [metadata] groups = ["default", "dev"] strategy = ["cross_platform"] -lock_version = "4.4" -content_hash = "sha256:f221c0f86e96e886da92df79dcf494cf8b6b2de7070b6da0295dcfa5e8b447b4" +lock_version = "4.4.1" +content_hash = "sha256:6a6063030bf29161cc76091cdc97c734d09a92d3cb8e6ed2c1e86c385e00703b" + +[[package]] +name = "anyio" +version = "4.4.0" +requires_python = ">=3.8" +summary = "High level compatibility layer for multiple asynchronous event loop implementations" +dependencies = [ + "exceptiongroup>=1.0.2; python_version < \"3.11\"", + "idna>=2.8", + "sniffio>=1.1", + "typing-extensions>=4.1; python_version < \"3.11\"", +] +files = [ + {file = "anyio-4.4.0-py3-none-any.whl", hash = "sha256:c1b2d8f46a8a812513012e1107cb0e68c17159a7a594208005a57dc776e1bdc7"}, + {file = "anyio-4.4.0.tar.gz", hash = "sha256:5aadc6a1bbb7cdb0bede386cac5e2940f5e2ff3aa20277e991cf028e0585ce94"}, +] + +[[package]] +name = "asgiref" +version = "3.8.1" +requires_python = ">=3.8" +summary = "ASGI specs, helper code, and adapters" +dependencies = [ + "typing-extensions>=4; python_version < \"3.11\"", +] +files = [ + {file = "asgiref-3.8.1-py3-none-any.whl", hash = "sha256:3e1e3ecc849832fe52ccf2cb6686b7a55f82bb1d6aee72a58826471390335e47"}, + {file = "asgiref-3.8.1.tar.gz", hash = "sha256:c343bd80a0bec947a9860adb4c432ffa7db769836c64238fc34bdc3fec84d590"}, +] + +[[package]] +name = "attrs" +version = "23.2.0" +requires_python = ">=3.7" +summary = "Classes Without Boilerplate" +files = [ + {file = "attrs-23.2.0-py3-none-any.whl", hash = "sha256:99b87a485a5820b23b879f04c2305b44b951b502fd64be915879d77a7e8fc6f1"}, + {file = "attrs-23.2.0.tar.gz", hash = "sha256:935dc3b529c262f6cf76e50877d35a4bd3c1de194fd41f47a2b7ae8f19971f30"}, +] + +[[package]] +name = "backports-zoneinfo" +version = "0.2.1" +requires_python = ">=3.6" +summary = "Backport of the standard library zoneinfo module" +files = [ + {file = "backports.zoneinfo-0.2.1-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:8961c0f32cd0336fb8e8ead11a1f8cd99ec07145ec2931122faaac1c8f7fd987"}, + {file = "backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_i686.whl", hash = "sha256:e81b76cace8eda1fca50e345242ba977f9be6ae3945af8d46326d776b4cf78d1"}, + {file = "backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:7b0a64cda4145548fed9efc10322770f929b944ce5cee6c0dfe0c87bf4c0c8c9"}, + {file = "backports.zoneinfo-0.2.1-cp38-cp38-win32.whl", hash = "sha256:1b13e654a55cd45672cb54ed12148cd33628f672548f373963b0bff67b217328"}, + {file = "backports.zoneinfo-0.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:4a0f800587060bf8880f954dbef70de6c11bbe59c673c3d818921f042f9954a6"}, + {file = "backports.zoneinfo-0.2.1.tar.gz", hash = "sha256:fadbfe37f74051d024037f223b8e001611eac868b5c5b06144ef4d8b799862f2"}, +] [[package]] name = "certifi" @@ -107,6 +160,30 @@ files = [ {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "contextlib2" +version = "21.6.0" +requires_python = ">=3.6" +summary = "Backports and enhancements for the contextlib module" +files = [ + {file = "contextlib2-21.6.0-py2.py3-none-any.whl", hash = "sha256:3fbdb64466afd23abaf6c977627b75b6139a5a3e8ce38405c5b413aed7a0471f"}, + {file = "contextlib2-21.6.0.tar.gz", hash = "sha256:ab1e2bfe1d01d968e1b7e8d9023bc51ef3509bba217bb730cee3827e1ee82869"}, +] + +[[package]] +name = "croniter" +version = "2.0.5" +requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.6" +summary = "croniter provides iteration for datetime object with cron like format" +dependencies = [ + "python-dateutil", + "pytz>2021.1", +] +files = [ + {file = "croniter-2.0.5-py2.py3-none-any.whl", hash = "sha256:fdbb44920944045cc323db54599b321325141d82d14fa7453bc0699826bbe9ed"}, + {file = "croniter-2.0.5.tar.gz", hash = "sha256:f1f8ca0af64212fbe99b1bee125ee5a1b53a9c1b433968d8bca8817b79d237f3"}, +] + [[package]] name = "et-xmlfile" version = "1.1.0" @@ -147,6 +224,19 @@ files = [ {file = "idna-3.4.tar.gz", hash = "sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4"}, ] +[[package]] +name = "importlib-resources" +version = "6.4.0" +requires_python = ">=3.8" +summary = "Read resources from Python packages" +dependencies = [ + "zipp>=3.1.0; python_version < \"3.10\"", +] +files = [ + {file = "importlib_resources-6.4.0-py3-none-any.whl", hash = "sha256:50d10f043df931902d4194ea07ec57960f66a80449ff867bfe782b4c486ba78c"}, + {file = "importlib_resources-6.4.0.tar.gz", hash = "sha256:cdb2b453b8046ca4e3798eb1d84f3cce1446a0e8e7b5ef4efb600f19fc398145"}, +] + [[package]] name = "iniconfig" version = "2.0.0" @@ -397,6 +487,69 @@ files = [ {file = "pluggy-1.3.0.tar.gz", hash = "sha256:cf61ae8f126ac6f7c451172cf30e3e43d3ca77615509771b3a984a0730651e12"}, ] +[[package]] +name = "procrastinate" +version = "2.3.0" +requires_python = "<4.0,>=3.8" +summary = "Postgres-based distributed task processing library" +dependencies = [ + "anyio", + "asgiref", + "attrs", + "contextlib2; python_version < \"3.10\"", + "croniter", + "importlib-resources>=1.4; python_version < \"3.9\"", + "psycopg[pool]<4.0.0,>=3.1.13", + "python-dateutil", +] +files = [ + {file = "procrastinate-2.3.0-py3-none-any.whl", hash = "sha256:69fb384b5c9b3973f6e2d22d1b430e400925519e4682eb7f8f326379796aa484"}, + {file = "procrastinate-2.3.0.tar.gz", hash = "sha256:3a02c4f1df4e9701677b84aba6e4a4c7963df8ddbdb871b08d2ca25f2ba9c01b"}, +] + +[[package]] +name = "psycopg" +version = "3.1.19" +requires_python = ">=3.7" +summary = "PostgreSQL database adapter for Python" +dependencies = [ + "backports-zoneinfo>=0.2.0; python_version < \"3.9\"", + "typing-extensions>=4.1", + "tzdata; sys_platform == \"win32\"", +] +files = [ + {file = "psycopg-3.1.19-py3-none-any.whl", hash = "sha256:dca5e5521c859f6606686432ae1c94e8766d29cc91f2ee595378c510cc5b0731"}, + {file = "psycopg-3.1.19.tar.gz", hash = "sha256:92d7b78ad82426cdcf1a0440678209faa890c6e1721361c2f8901f0dccd62961"}, +] + +[[package]] +name = "psycopg-pool" +version = "3.2.2" +requires_python = ">=3.8" +summary = "Connection Pool for Psycopg" +dependencies = [ + "typing-extensions>=4.4", +] +files = [ + {file = "psycopg_pool-3.2.2-py3-none-any.whl", hash = "sha256:273081d0fbfaced4f35e69200c89cb8fbddfe277c38cc86c235b90a2ec2c8153"}, + {file = "psycopg_pool-3.2.2.tar.gz", hash = "sha256:9e22c370045f6d7f2666a5ad1b0caf345f9f1912195b0b25d0d3bcc4f3a7389c"}, +] + +[[package]] +name = "psycopg" +version = "3.1.19" +extras = ["pool"] +requires_python = ">=3.7" +summary = "PostgreSQL database adapter for Python" +dependencies = [ + "psycopg-pool", + "psycopg==3.1.19", +] +files = [ + {file = "psycopg-3.1.19-py3-none-any.whl", hash = "sha256:dca5e5521c859f6606686432ae1c94e8766d29cc91f2ee595378c510cc5b0731"}, + {file = "psycopg-3.1.19.tar.gz", hash = "sha256:92d7b78ad82426cdcf1a0440678209faa890c6e1721361c2f8901f0dccd62961"}, +] + [[package]] name = "pyarrow" version = "14.0.2" @@ -536,6 +689,16 @@ files = [ {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, ] +[[package]] +name = "sniffio" +version = "1.3.1" +requires_python = ">=3.7" +summary = "Sniff out which async library your code is running under" +files = [ + {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, + {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, +] + [[package]] name = "tomli" version = "2.0.1" @@ -565,6 +728,16 @@ files = [ {file = "tornado-6.3.2.tar.gz", hash = "sha256:4b927c4f19b71e627b13f3db2324e4ae660527143f9e1f2e2fb404f3a187e2ba"}, ] +[[package]] +name = "typing-extensions" +version = "4.12.0" +requires_python = ">=3.8" +summary = "Backported and Experimental Type Hints for Python 3.8+" +files = [ + {file = "typing_extensions-4.12.0-py3-none-any.whl", hash = "sha256:b349c66bea9016ac22978d800cfff206d5f9816951f12a7d0ec5578b0a819594"}, + {file = "typing_extensions-4.12.0.tar.gz", hash = "sha256:8cbcdc8606ebcb0d95453ad7dc5065e6237b6aa230a31e81d0f440c30fed5fd8"}, +] + [[package]] name = "tzdata" version = "2023.4" @@ -605,3 +778,13 @@ files = [ {file = "user-agents-2.2.0.tar.gz", hash = "sha256:d36d25178db65308d1458c5fa4ab39c9b2619377010130329f3955e7626ead26"}, {file = "user_agents-2.2.0-py3-none-any.whl", hash = "sha256:a98c4dc72ecbc64812c4534108806fb0a0b3a11ec3fd1eafe807cee5b0a942e7"}, ] + +[[package]] +name = "zipp" +version = "3.19.0" +requires_python = ">=3.8" +summary = "Backport of pathlib-compatible object wrapper for zip files" +files = [ + {file = "zipp-3.19.0-py3-none-any.whl", hash = "sha256:96dc6ad62f1441bcaccef23b274ec471518daf4fbbc580341204936a5a3dddec"}, + {file = "zipp-3.19.0.tar.gz", hash = "sha256:952df858fb3164426c976d9338d3961e8e8b3758e2e059e0f754b8c4262625ee"}, +] diff --git a/wizard/pyproject.toml b/wizard/pyproject.toml index 40d6971..54c51b1 100644 --- a/wizard/pyproject.toml +++ b/wizard/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "pandas>=2.0.3", "chardet>=5.2.0", "sentry-sdk>=1.40.5", + "procrastinate", ] requires-python = ">=3.8" license = {text = "GPLv3"} diff --git a/wizard/tasks.py b/wizard/tasks.py new file mode 100644 index 0000000..d0d1929 --- /dev/null +++ b/wizard/tasks.py @@ -0,0 +1,37 @@ +import os +import pathlib +from procrastinate import App, PsycopgConnector + +from parsers.parser import detect_file + +DATABASE_URL = os.getenv("DATABASE_URL") + +DATA_PATH = pathlib.Path(os.getenv("DATA_PATH", '/data/')) + +LOGGERS_PATH = DATA_PATH / 'loggers' +SPREADSHEETS_PATH = DATA_PATH / 'metadata' +PARQUET_PATH = DATA_PATH / 'parquet' + +if not DATABASE_URL: + raise Exception('Missing DATABASE_URL') + +app = App( + connector=PsycopgConnector( + conninfo=DATABASE_URL + ) +) + + +@app.task(name='to_parquet') +def to_parquet(file_path: str): + parser = detect_file(path=pathlib.Path(file_path)) + parser.write_parquet(PARQUET_PATH) + + +@app.periodic(cron="* * * * *") +@app.task(name='check_missing') +def check_missing(timestamp: int): + for f in LOGGERS_PATH.iterdir(): + if not (PARQUET_PATH / (f.name + '.parquet')).exists(): + print(f.name + ' not found, adding a task to generate it') + to_parquet.configure(lock=f.name).defer(file_path=str(f)) diff --git a/wizard/wizard.py b/wizard/wizard.py index 6654614..b5ce9f5 100755 --- a/wizard/wizard.py +++ b/wizard/wizard.py @@ -22,6 +22,8 @@ from pywebio.session import run_js from jinja2 import Environment, FileSystemLoader, select_autoescape +from tasks import to_parquet, app + if os.getenv("SENTRY_DSN"): import sentry_sdk sentry_sdk.init( @@ -123,24 +125,25 @@ def put_reload_button(): ''' def wizard(): - put_widget(tpl, {'contents': [ - put_link('Explore Database', '/pgweb/', new_window=True), - put_link('Rest APIs', '/postgrest/', new_window=True), - put_link('Uploaded Data', '/data/', new_window=True), - ]}) - - result = actions( - "What you want to upload?", - buttons=[ - {"value": "metadata", "type": "submit", "label": "Metadata"}, - {"value": "loggers", "type": "submit", "label": "Loggers"}, - ]) - if result == "metadata": - handle_metadata() - elif result == "loggers": - handle_loggers() - else: - put_reload_button() + with app.open(): + put_widget(tpl, {'contents': [ + put_link('Explore Database', '/pgweb/', new_window=True), + put_link('Rest APIs', '/postgrest/', new_window=True), + put_link('Uploaded Data', '/data/', new_window=True), + ]}) + + result = actions( + "What you want to upload?", + buttons=[ + {"value": "metadata", "type": "submit", "label": "Metadata"}, + {"value": "loggers", "type": "submit", "label": "Loggers"}, + ]) + if result == "metadata": + handle_metadata() + elif result == "loggers": + handle_loggers() + else: + put_reload_button() if os.getenv("SENTRY_DSN"): wizard = config(js_file="/data/scripts/sentry.js")(wizard) @@ -295,6 +298,7 @@ def handle_loggers(): put_error(f"Logger data {filename}: {traceback.format_exc()}") else: shutil.move(temp_path, definitive_path) + to_parquet.configure(lock=definitive_path.name).defer(file_path=str(definitive_path)) put_success(f"Logger data {filename} have been imported sucessfully.") shutil.rmtree(TEMP_DIR)