diff --git a/main_etl_nested_metrics_converter/converter.py b/main_etl_nested_metrics_converter/converter.py index b07c0f6..1f86907 100644 --- a/main_etl_nested_metrics_converter/converter.py +++ b/main_etl_nested_metrics_converter/converter.py @@ -11,7 +11,7 @@ import json import logging from collections import namedtuple -from datetime import date, datetime, timedelta +from datetime import date, datetime from hashlib import blake2s from os import getenv from sqlalchemy import column, select, text @@ -124,7 +124,7 @@ def from_sql(partition: str, cutoff_date: datetime): This gets all the vaccination data needed from the DB :param partition: it's a 'partition_id' used in the SQL query string - :param release_dates: the latest release dates + :param cutoff_date: this defines the oldest data we're insteredted in :return: values from DB :rtype: list @@ -133,25 +133,15 @@ def from_sql(partition: str, cutoff_date: datetime): connection = session.connection() try: - # extend time range (days) used in the query until the data has been retrieved - # with the limit to 42 days (35 + 7 of cutoff_day) - for days in range(0, 36, 7): - date = cutoff_date - timedelta(days=days) - logging.info(f"Using this date for VACCINATIONS_QUERY: {str(date)}") - values_query = queries.VACCINATIONS_QUERY.format( - partition=partition, - date=date, - ) - - resp = connection.execute( - text(values_query), - ) - values = [TimeSeriesData(*record) for record in resp.fetchall()] - - if values: - break + query = queries.VACCINATIONS_QUERY.format( + partition=partition, + date=cutoff_date, + ) - logging.info(f"No data retrieved when used {str(date)} date in the query") + resp = connection.execute( + text(query), + ) + values = [TimeSeriesData(*record) for record in resp.fetchall()] except Exception as err: session.rollback() raise err @@ -333,12 +323,11 @@ def main(rawtimestamp: str) -> str: partition = f"{current_release_datestamp:%Y_%-m_%-d}" logging.info(f"The partition id (date related part): {partition}") - # Set the 'cutoff_date' to define (with current_release_datestamp) the time range - # to use in the sql query. It will be dynamically extended in the from_sql() function, - # as it crucial to get the data that is then used in many other parts of the project. - cutoff_date = current_release_datestamp - timedelta(days=14) + # Set 2023-05-04 as cut off date used in the SQL query + cutoff_date = date(year=2023, month=5, day=4) # Retrieving data (since the previous release) --------------------------------------- + logging.info("Getting data from DB") values = from_sql(partition, cutoff_date) if values: @@ -364,4 +353,14 @@ def main(rawtimestamp: str) -> str: # # This is not needed for prod, but useful for local development # if __name__ == '__main__': -# main("2023-06-22T16:15:14.123456") +# from sys import stdout + +# root = logging.getLogger() +# root.setLevel(logging.DEBUG) +# handler = logging.StreamHandler(stdout) +# handler.setLevel(logging.DEBUG) +# formatter = logging.Formatter('[%(asctime)s] %(levelname)s | %(message)s') +# handler.setFormatter(formatter) +# root.addHandler(handler) + +# main("2023-06-29T16:15:14.123456") diff --git a/main_etl_nested_metrics_converter/queries.py b/main_etl_nested_metrics_converter/queries.py index 289ea2b..9b4f8fa 100644 --- a/main_etl_nested_metrics_converter/queries.py +++ b/main_etl_nested_metrics_converter/queries.py @@ -16,35 +16,42 @@ """ VACCINATIONS_QUERY = """\ -SELECT partition_id, release_id, area_id, date, payload -FROM ( - SELECT * - FROM covid19.time_series_p{partition}_other AS tsother - JOIN covid19.release_reference AS rr ON rr.id = release_id - JOIN covid19.metric_reference AS mr ON mr.id = metric_id - JOIN covid19.area_reference AS ar ON ar.id = tsother.area_id - WHERE metric = 'vaccinationsAgeDemographics' - AND date > ( DATE('{date}')) - UNION - ( - SELECT * - FROM covid19.time_series_p{partition}_utla AS tsutla - JOIN covid19.release_reference AS rr ON rr.id = release_id - JOIN covid19.metric_reference AS mr ON mr.id = metric_id - JOIN covid19.area_reference AS ar ON ar.id = tsutla.area_id - WHERE metric = 'vaccinationsAgeDemographics' - AND date > ( DATE('{date}')) - ) - UNION - ( - SELECT * - FROM covid19.time_series_p{partition}_ltla AS tsltla - JOIN covid19.release_reference AS rr ON rr.id = release_id - JOIN covid19.metric_reference AS mr ON mr.id = metric_id - JOIN covid19.area_reference AS ar ON ar.id = tsltla.area_id - WHERE metric = 'vaccinationsAgeDemographics' - AND date > ( DATE('{date}')) - ) - ) AS tsltla -ORDER BY date DESC;\ +SELECT partition_id, release_id, area_id, date, payload FROM ( + SELECT partition_id, release_id, area_id, date, payload, + RANK() OVER ( + PARTITION BY area_id + ORDER BY date DESC + ) AS area_latest + FROM ( + SELECT * + FROM covid19.time_series_p{partition}_other AS tsother + JOIN covid19.release_reference AS rr ON rr.id = release_id + JOIN covid19.metric_reference AS mr ON mr.id = metric_id + JOIN covid19.area_reference AS ar ON ar.id = tsother.area_id + WHERE metric = 'vaccinationsAgeDemographics' + AND date > ( DATE('{date}')) + UNION + ( + SELECT * + FROM covid19.time_series_p{partition}_utla AS tsutla + JOIN covid19.release_reference AS rr ON rr.id = release_id + JOIN covid19.metric_reference AS mr ON mr.id = metric_id + JOIN covid19.area_reference AS ar ON ar.id = tsutla.area_id + WHERE metric = 'vaccinationsAgeDemographics' + AND date > ( DATE('{date}')) + ) + UNION + ( + SELECT * + FROM covid19.time_series_p{partition}_ltla AS tsltla + JOIN covid19.release_reference AS rr ON rr.id = release_id + JOIN covid19.metric_reference AS mr ON mr.id = metric_id + JOIN covid19.area_reference AS ar ON ar.id = tsltla.area_id + WHERE metric = 'vaccinationsAgeDemographics' + AND date > ( DATE('{date}')) + ) + ) AS ts + ORDER BY area_id + ) AS foo +WHERE area_latest < 4; """ \ No newline at end of file