From 41d531a114c7d3ee3b84c208eeb65e39bbc7c255 Mon Sep 17 00:00:00 2001 From: Jaroslaw Michalski Date: Tue, 4 Jul 2023 14:27:41 +0100 Subject: [PATCH 1/3] modify sql query to get data for all areas The query gets the latest 3 days of data for each area_id. --- .../converter.py | 43 ++++++------ main_etl_nested_metrics_converter/queries.py | 69 ++++++++++--------- 2 files changed, 60 insertions(+), 52 deletions(-) diff --git a/main_etl_nested_metrics_converter/converter.py b/main_etl_nested_metrics_converter/converter.py index b07c0f6..57c1e1b 100644 --- a/main_etl_nested_metrics_converter/converter.py +++ b/main_etl_nested_metrics_converter/converter.py @@ -124,7 +124,7 @@ def from_sql(partition: str, cutoff_date: datetime): This gets all the vaccination data needed from the DB :param partition: it's a 'partition_id' used in the SQL query string - :param release_dates: the latest release dates + :param cutoff_date: this defines the oldest data we're insteredted in :return: values from DB :rtype: list @@ -133,25 +133,15 @@ def from_sql(partition: str, cutoff_date: datetime): connection = session.connection() try: - # extend time range (days) used in the query until the data has been retrieved - # with the limit to 42 days (35 + 7 of cutoff_day) - for days in range(0, 36, 7): - date = cutoff_date - timedelta(days=days) - logging.info(f"Using this date for VACCINATIONS_QUERY: {str(date)}") - values_query = queries.VACCINATIONS_QUERY.format( - partition=partition, - date=date, - ) - - resp = connection.execute( - text(values_query), - ) - values = [TimeSeriesData(*record) for record in resp.fetchall()] - - if values: - break + query = queries.VACCINATIONS_QUERY.format( + partition=partition, + date=cutoff_date, + ) - logging.info(f"No data retrieved when used {str(date)} date in the query") + resp = connection.execute( + text(query), + ) + values = [TimeSeriesData(*record) for record in resp.fetchall()] except Exception as err: session.rollback() raise err @@ -336,9 +326,10 @@ def main(rawtimestamp: str) -> str: # Set the 'cutoff_date' to define (with current_release_datestamp) the time range # to use in the sql query. It will be dynamically extended in the from_sql() function, # as it crucial to get the data that is then used in many other parts of the project. - cutoff_date = current_release_datestamp - timedelta(days=14) + cutoff_date = current_release_datestamp - timedelta(days=60) # Retrieving data (since the previous release) --------------------------------------- + logging.info("Getting data from DB") values = from_sql(partition, cutoff_date) if values: @@ -364,4 +355,14 @@ def main(rawtimestamp: str) -> str: # # This is not needed for prod, but useful for local development # if __name__ == '__main__': -# main("2023-06-22T16:15:14.123456") +# from sys import stdout + +# root = logging.getLogger() +# root.setLevel(logging.DEBUG) +# handler = logging.StreamHandler(stdout) +# handler.setLevel(logging.DEBUG) +# formatter = logging.Formatter('[%(asctime)s] %(levelname)s | %(message)s') +# handler.setFormatter(formatter) +# root.addHandler(handler) + +# main("2023-06-29T16:15:14.123456") diff --git a/main_etl_nested_metrics_converter/queries.py b/main_etl_nested_metrics_converter/queries.py index 289ea2b..9b4f8fa 100644 --- a/main_etl_nested_metrics_converter/queries.py +++ b/main_etl_nested_metrics_converter/queries.py @@ -16,35 +16,42 @@ """ VACCINATIONS_QUERY = """\ -SELECT partition_id, release_id, area_id, date, payload -FROM ( - SELECT * - FROM covid19.time_series_p{partition}_other AS tsother - JOIN covid19.release_reference AS rr ON rr.id = release_id - JOIN covid19.metric_reference AS mr ON mr.id = metric_id - JOIN covid19.area_reference AS ar ON ar.id = tsother.area_id - WHERE metric = 'vaccinationsAgeDemographics' - AND date > ( DATE('{date}')) - UNION - ( - SELECT * - FROM covid19.time_series_p{partition}_utla AS tsutla - JOIN covid19.release_reference AS rr ON rr.id = release_id - JOIN covid19.metric_reference AS mr ON mr.id = metric_id - JOIN covid19.area_reference AS ar ON ar.id = tsutla.area_id - WHERE metric = 'vaccinationsAgeDemographics' - AND date > ( DATE('{date}')) - ) - UNION - ( - SELECT * - FROM covid19.time_series_p{partition}_ltla AS tsltla - JOIN covid19.release_reference AS rr ON rr.id = release_id - JOIN covid19.metric_reference AS mr ON mr.id = metric_id - JOIN covid19.area_reference AS ar ON ar.id = tsltla.area_id - WHERE metric = 'vaccinationsAgeDemographics' - AND date > ( DATE('{date}')) - ) - ) AS tsltla -ORDER BY date DESC;\ +SELECT partition_id, release_id, area_id, date, payload FROM ( + SELECT partition_id, release_id, area_id, date, payload, + RANK() OVER ( + PARTITION BY area_id + ORDER BY date DESC + ) AS area_latest + FROM ( + SELECT * + FROM covid19.time_series_p{partition}_other AS tsother + JOIN covid19.release_reference AS rr ON rr.id = release_id + JOIN covid19.metric_reference AS mr ON mr.id = metric_id + JOIN covid19.area_reference AS ar ON ar.id = tsother.area_id + WHERE metric = 'vaccinationsAgeDemographics' + AND date > ( DATE('{date}')) + UNION + ( + SELECT * + FROM covid19.time_series_p{partition}_utla AS tsutla + JOIN covid19.release_reference AS rr ON rr.id = release_id + JOIN covid19.metric_reference AS mr ON mr.id = metric_id + JOIN covid19.area_reference AS ar ON ar.id = tsutla.area_id + WHERE metric = 'vaccinationsAgeDemographics' + AND date > ( DATE('{date}')) + ) + UNION + ( + SELECT * + FROM covid19.time_series_p{partition}_ltla AS tsltla + JOIN covid19.release_reference AS rr ON rr.id = release_id + JOIN covid19.metric_reference AS mr ON mr.id = metric_id + JOIN covid19.area_reference AS ar ON ar.id = tsltla.area_id + WHERE metric = 'vaccinationsAgeDemographics' + AND date > ( DATE('{date}')) + ) + ) AS ts + ORDER BY area_id + ) AS foo +WHERE area_latest < 4; """ \ No newline at end of file From e658a78a5e9affaeb7911a6b462dbe268aed047d Mon Sep 17 00:00:00 2001 From: Jaroslaw Michalski Date: Tue, 4 Jul 2023 15:55:42 +0100 Subject: [PATCH 2/3] set cut off date to 2023-05-04 --- .../converter.py | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/main_etl_nested_metrics_converter/converter.py b/main_etl_nested_metrics_converter/converter.py index 57c1e1b..963aae2 100644 --- a/main_etl_nested_metrics_converter/converter.py +++ b/main_etl_nested_metrics_converter/converter.py @@ -1,17 +1,17 @@ #!/usr/bin python3 -# # This commented out section was used in the local development -# import pathlib -# import site +# This commented out section was used in the local development +import pathlib +import site -# test_dir = pathlib.Path(__file__).resolve().parent -# root_path = test_dir.parent -# site.addsitedir(root_path) +test_dir = pathlib.Path(__file__).resolve().parent +root_path = test_dir.parent +site.addsitedir(root_path) import json import logging from collections import namedtuple -from datetime import date, datetime, timedelta +from datetime import date, datetime from hashlib import blake2s from os import getenv from sqlalchemy import column, select, text @@ -323,10 +323,8 @@ def main(rawtimestamp: str) -> str: partition = f"{current_release_datestamp:%Y_%-m_%-d}" logging.info(f"The partition id (date related part): {partition}") - # Set the 'cutoff_date' to define (with current_release_datestamp) the time range - # to use in the sql query. It will be dynamically extended in the from_sql() function, - # as it crucial to get the data that is then used in many other parts of the project. - cutoff_date = current_release_datestamp - timedelta(days=60) + # Set 2023-05-04 as cut off date used in the SQL query + cutoff_date = date(year=2023, month=5, day=4) # Retrieving data (since the previous release) --------------------------------------- logging.info("Getting data from DB") @@ -353,16 +351,16 @@ def main(rawtimestamp: str) -> str: ) -# # This is not needed for prod, but useful for local development -# if __name__ == '__main__': -# from sys import stdout +# This is not needed for prod, but useful for local development +if __name__ == '__main__': + from sys import stdout -# root = logging.getLogger() -# root.setLevel(logging.DEBUG) -# handler = logging.StreamHandler(stdout) -# handler.setLevel(logging.DEBUG) -# formatter = logging.Formatter('[%(asctime)s] %(levelname)s | %(message)s') -# handler.setFormatter(formatter) -# root.addHandler(handler) + root = logging.getLogger() + root.setLevel(logging.DEBUG) + handler = logging.StreamHandler(stdout) + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('[%(asctime)s] %(levelname)s | %(message)s') + handler.setFormatter(formatter) + root.addHandler(handler) -# main("2023-06-29T16:15:14.123456") + main("2023-06-29T16:15:14.123456") From bfdb1edd75293cacc5e590b136c3e6ea27af7f6b Mon Sep 17 00:00:00 2001 From: Jaroslaw Michalski Date: Tue, 4 Jul 2023 16:00:01 +0100 Subject: [PATCH 3/3] comment out some lines --- .../converter.py | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/main_etl_nested_metrics_converter/converter.py b/main_etl_nested_metrics_converter/converter.py index 963aae2..1f86907 100644 --- a/main_etl_nested_metrics_converter/converter.py +++ b/main_etl_nested_metrics_converter/converter.py @@ -1,12 +1,12 @@ #!/usr/bin python3 -# This commented out section was used in the local development -import pathlib -import site +# # This commented out section was used in the local development +# import pathlib +# import site -test_dir = pathlib.Path(__file__).resolve().parent -root_path = test_dir.parent -site.addsitedir(root_path) +# test_dir = pathlib.Path(__file__).resolve().parent +# root_path = test_dir.parent +# site.addsitedir(root_path) import json import logging @@ -351,16 +351,16 @@ def main(rawtimestamp: str) -> str: ) -# This is not needed for prod, but useful for local development -if __name__ == '__main__': - from sys import stdout +# # This is not needed for prod, but useful for local development +# if __name__ == '__main__': +# from sys import stdout - root = logging.getLogger() - root.setLevel(logging.DEBUG) - handler = logging.StreamHandler(stdout) - handler.setLevel(logging.DEBUG) - formatter = logging.Formatter('[%(asctime)s] %(levelname)s | %(message)s') - handler.setFormatter(formatter) - root.addHandler(handler) +# root = logging.getLogger() +# root.setLevel(logging.DEBUG) +# handler = logging.StreamHandler(stdout) +# handler.setLevel(logging.DEBUG) +# formatter = logging.Formatter('[%(asctime)s] %(levelname)s | %(message)s') +# handler.setFormatter(formatter) +# root.addHandler(handler) - main("2023-06-29T16:15:14.123456") +# main("2023-06-29T16:15:14.123456")