Skip to content

Commit

Permalink
Merge pull request #191 from UKHSA-Internal/COVP-154_further_improvem…
Browse files Browse the repository at this point in the history
…ents

COVP-154 Issue with "NA" showing for spring boosters in the devolved administrations
  • Loading branch information
adebayoolabintan committed Jul 5, 2023
2 parents 92c4885 + bfdb1ed commit 75ee9b0
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 56 deletions.
49 changes: 24 additions & 25 deletions main_etl_nested_metrics_converter/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import json
import logging
from collections import namedtuple
from datetime import date, datetime, timedelta
from datetime import date, datetime
from hashlib import blake2s
from os import getenv
from sqlalchemy import column, select, text
Expand Down Expand Up @@ -124,7 +124,7 @@ def from_sql(partition: str, cutoff_date: datetime):
This gets all the vaccination data needed from the DB
:param partition: it's a 'partition_id' used in the SQL query string
:param release_dates: the latest release dates
:param cutoff_date: this defines the oldest data we're insteredted in
:return: values from DB
:rtype: list
Expand All @@ -133,25 +133,15 @@ def from_sql(partition: str, cutoff_date: datetime):
connection = session.connection()

try:
# extend time range (days) used in the query until the data has been retrieved
# with the limit to 42 days (35 + 7 of cutoff_day)
for days in range(0, 36, 7):
date = cutoff_date - timedelta(days=days)
logging.info(f"Using this date for VACCINATIONS_QUERY: {str(date)}")
values_query = queries.VACCINATIONS_QUERY.format(
partition=partition,
date=date,
)

resp = connection.execute(
text(values_query),
)
values = [TimeSeriesData(*record) for record in resp.fetchall()]

if values:
break
query = queries.VACCINATIONS_QUERY.format(
partition=partition,
date=cutoff_date,
)

logging.info(f"No data retrieved when used {str(date)} date in the query")
resp = connection.execute(
text(query),
)
values = [TimeSeriesData(*record) for record in resp.fetchall()]
except Exception as err:
session.rollback()
raise err
Expand Down Expand Up @@ -333,12 +323,11 @@ def main(rawtimestamp: str) -> str:
partition = f"{current_release_datestamp:%Y_%-m_%-d}"
logging.info(f"The partition id (date related part): {partition}")

# Set the 'cutoff_date' to define (with current_release_datestamp) the time range
# to use in the sql query. It will be dynamically extended in the from_sql() function,
# as it crucial to get the data that is then used in many other parts of the project.
cutoff_date = current_release_datestamp - timedelta(days=14)
# Set 2023-05-04 as cut off date used in the SQL query
cutoff_date = date(year=2023, month=5, day=4)

# Retrieving data (since the previous release) ---------------------------------------
logging.info("Getting data from DB")
values = from_sql(partition, cutoff_date)

if values:
Expand All @@ -364,4 +353,14 @@ def main(rawtimestamp: str) -> str:

# # This is not needed for prod, but useful for local development
# if __name__ == '__main__':
# main("2023-06-22T16:15:14.123456")
# from sys import stdout

# root = logging.getLogger()
# root.setLevel(logging.DEBUG)
# handler = logging.StreamHandler(stdout)
# handler.setLevel(logging.DEBUG)
# formatter = logging.Formatter('[%(asctime)s] %(levelname)s | %(message)s')
# handler.setFormatter(formatter)
# root.addHandler(handler)

# main("2023-06-29T16:15:14.123456")
69 changes: 38 additions & 31 deletions main_etl_nested_metrics_converter/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,42 @@
"""

VACCINATIONS_QUERY = """\
SELECT partition_id, release_id, area_id, date, payload
FROM (
SELECT *
FROM covid19.time_series_p{partition}_other AS tsother
JOIN covid19.release_reference AS rr ON rr.id = release_id
JOIN covid19.metric_reference AS mr ON mr.id = metric_id
JOIN covid19.area_reference AS ar ON ar.id = tsother.area_id
WHERE metric = 'vaccinationsAgeDemographics'
AND date > ( DATE('{date}'))
UNION
(
SELECT *
FROM covid19.time_series_p{partition}_utla AS tsutla
JOIN covid19.release_reference AS rr ON rr.id = release_id
JOIN covid19.metric_reference AS mr ON mr.id = metric_id
JOIN covid19.area_reference AS ar ON ar.id = tsutla.area_id
WHERE metric = 'vaccinationsAgeDemographics'
AND date > ( DATE('{date}'))
)
UNION
(
SELECT *
FROM covid19.time_series_p{partition}_ltla AS tsltla
JOIN covid19.release_reference AS rr ON rr.id = release_id
JOIN covid19.metric_reference AS mr ON mr.id = metric_id
JOIN covid19.area_reference AS ar ON ar.id = tsltla.area_id
WHERE metric = 'vaccinationsAgeDemographics'
AND date > ( DATE('{date}'))
)
) AS tsltla
ORDER BY date DESC;\
SELECT partition_id, release_id, area_id, date, payload FROM (
SELECT partition_id, release_id, area_id, date, payload,
RANK() OVER (
PARTITION BY area_id
ORDER BY date DESC
) AS area_latest
FROM (
SELECT *
FROM covid19.time_series_p{partition}_other AS tsother
JOIN covid19.release_reference AS rr ON rr.id = release_id
JOIN covid19.metric_reference AS mr ON mr.id = metric_id
JOIN covid19.area_reference AS ar ON ar.id = tsother.area_id
WHERE metric = 'vaccinationsAgeDemographics'
AND date > ( DATE('{date}'))
UNION
(
SELECT *
FROM covid19.time_series_p{partition}_utla AS tsutla
JOIN covid19.release_reference AS rr ON rr.id = release_id
JOIN covid19.metric_reference AS mr ON mr.id = metric_id
JOIN covid19.area_reference AS ar ON ar.id = tsutla.area_id
WHERE metric = 'vaccinationsAgeDemographics'
AND date > ( DATE('{date}'))
)
UNION
(
SELECT *
FROM covid19.time_series_p{partition}_ltla AS tsltla
JOIN covid19.release_reference AS rr ON rr.id = release_id
JOIN covid19.metric_reference AS mr ON mr.id = metric_id
JOIN covid19.area_reference AS ar ON ar.id = tsltla.area_id
WHERE metric = 'vaccinationsAgeDemographics'
AND date > ( DATE('{date}'))
)
) AS ts
ORDER BY area_id
) AS foo
WHERE area_latest < 4;
"""

0 comments on commit 75ee9b0

Please sign in to comment.