Skip to content

Commit

Permalink
Merge branch '4-draft-data-validation-as-dag' into 'main'
Browse files Browse the repository at this point in the history
Draft data validation as DAG

Closes noi-techpark#4

See merge request u-hopper/projects/industrial/open-data-hub-bz/bdp-elaborations!13
  • Loading branch information
Marco Angheben committed Feb 21, 2024
2 parents 1c36e70 + 8e83dc2 commit 57df5da
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 153 deletions.
40 changes: 21 additions & 19 deletions pollution_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,27 @@ airflow tasks test tutorial_of_mines print_date 2015-06-01

#### List of environmental variables for development

| Name | Required | Description | Default | Development suggested value |
|----------------------------------------|----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------|----------------------------------------------------|
| AIRFLOW_HOME | Yes | Airflow home. | - | <your_local_path>/pollution_v2/airflow |
| AIRFLOW_VAR_ODH_BASE_READER_URL | Yes | The base url for the ODH requests for reading data. | - | https://mobility.api.opendatahub.testingmachine.eu |
| AIRFLOW_VAR_ODH_BASE_WRITER_URL | Yes | The base url for the ODH requests for writing data. | - | https://do.nothing |
| AIRFLOW_VAR_ODH_AUTHENTICATION_URL | Yes | The url for ODH authentication endpoints. | - | https://auth.opendatahub.testingmachine.eu/auth/ |
| AIRFLOW_VAR_ODH_USERNAME | Yes | The username for the ODH authentication. | - | [email protected] |
| AIRFLOW_VAR_ODH_PASSWORD | Yes | The password for the ODH authentication. | - | US9wVZYvDhxHK76TKKA9hxiMB4bTJ6 |
| AIRFLOW_VAR_ODH_CLIENT_ID | Yes | The client ID for the ODH authentication. | - | uhopper-elaborations |
| AIRFLOW_VAR_ODH_CLIENT_SECRET | Yes | The client secret for the ODH authentication. | - | OeIXRAgHCwMOJ4qz0duaXgvvt32fBWGp |
| AIRFLOW_VAR_ODH_GRANT_TYPE | Yes | The token grant type for the ODH authentication. It is possible to specify more types by separating them using `;`. | "password" | client_credentials |
| AIRFLOW_VAR_ODH_PAGINATION_SIZE | No | The pagination size for the get requests to ODH. Set it to `-1` to disable it. | 200 | 10000 |
| AIRFLOW_VAR_ODH_MAX_POST_BATCH_SIZE | No | The maximum size of the batch for each post request to ODH. If not present there is not a maximum batch size and all data will sent in a single call. | - | 10000 |
| AIRFLOW_VAR_ODH_COMPUTATION_BATCH_SIZE | No | The maximum size (in days) of a batch to compute | 30 | |
| AIRFLOW_VAR_ODH_MINIMUM_STARTING_DATE | No | The minimum starting date[time] in isoformat (up to one second level of precision, milliseconds for the from date field are not supported in ODH) for downloading data from ODH if no pollution measures are available. | 2018-01-01 | |
| COMPUTATION_CHECKPOINT_REDIS_HOST | No | The redis host for the computation checkpoints. Set to enable the computation checkpoints | | |
| COMPUTATION_CHECKPOINT_REDIS_PORT | No | The port for the redis server for the computation checkpoints | 6379 | |
| COMPUTATION_CHECKPOINT_REDIS_DB | No | The DB number of the checkpoint redis server | 0 | |
| NO_PROXY | Yes (on macOS) | Sets every URL to skip proxy (see [here](https://docs.python.org/3/library/urllib.request.html)). | - | * |
| Name | Required | Description | Default | Development suggested value |
|----------------------------------------|----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------|----------------------------------------------------|
| AIRFLOW_HOME | Yes | Airflow home. | - | <your_local_path>/pollution_v2/airflow |
| AIRFLOW_VAR_ODH_BASE_READER_URL | Yes | The base url for the ODH requests for reading data. | - | https://mobility.api.opendatahub.testingmachine.eu |
| AIRFLOW_VAR_ODH_BASE_WRITER_URL | Yes | The base url for the ODH requests for writing data. | - | https://do.nothing |
| AIRFLOW_VAR_ODH_AUTHENTICATION_URL | Yes | The url for ODH authentication endpoints. | - | https://auth.opendatahub.testingmachine.eu/auth/ |
| AIRFLOW_VAR_ODH_USERNAME | Yes | The username for the ODH authentication. | - | [email protected] |
| AIRFLOW_VAR_ODH_PASSWORD | Yes | The password for the ODH authentication. | - | US9wVZYvDhxHK76TKKA9hxiMB4bTJ6 |
| AIRFLOW_VAR_ODH_CLIENT_ID | Yes | The client ID for the ODH authentication. | - | uhopper-elaborations |
| AIRFLOW_VAR_ODH_CLIENT_SECRET | Yes | The client secret for the ODH authentication. | - | OeIXRAgHCwMOJ4qz0duaXgvvt32fBWGp |
| AIRFLOW_VAR_ODH_GRANT_TYPE | Yes | The token grant type for the ODH authentication. It is possible to specify more types by separating them using `;`. | "password" | client_credentials |
| AIRFLOW_VAR_ODH_PAGINATION_SIZE | No | The pagination size for the get requests to ODH. Set it to `-1` to disable it. | 200 | 10000 |
| AIRFLOW_VAR_ODH_MAX_POST_BATCH_SIZE | No | The maximum size of the batch for each post request to ODH. If not present there is not a maximum batch size and all data will sent in a single call. | - | 10000 |
| AIRFLOW_VAR_ODH_COMPUTATION_BATCH_SIZE | No | The maximum size (in days) of a batch to compute | 30 | |
| AIRFLOW_VAR_ODH_MINIMUM_STARTING_DATE | No | The minimum starting date[time] in isoformat (up to one second level of precision, milliseconds for the from date field are not supported in ODH) for downloading data from ODH if no pollution measures are available. | 2018-01-01 | |
| COMPUTATION_CHECKPOINT_REDIS_HOST | No | The redis host for the computation checkpoints. Set to enable the computation checkpoints | | |
| COMPUTATION_CHECKPOINT_REDIS_PORT | No | The port for the redis server for the computation checkpoints | 6379 | |
| COMPUTATION_CHECKPOINT_REDIS_DB | No | The DB number of the checkpoint redis server | 0 | |
| DAG_POLLUTION_EXECUTION_CRONTAB | No | The crontab used to schedule pollution computation | 0 0 * * * | |
| DAG_VALIDATION_EXECUTION_CRONTAB | No | The crontab used to schedule data validation | 0 0 * * * | |
| NO_PROXY | Yes (on macOS) | Sets every URL to skip proxy (see [here](https://docs.python.org/3/library/urllib.request.html)). | - | * |

### Notes on Docker deployment

Expand Down
2 changes: 1 addition & 1 deletion pollution_v2/src/common/data_model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-FileCopyrightText: NOI Techpark <[email protected]>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
from .common import VehicleClass, MeasureCollection, Measure, Provenance, DataType
from .common import VehicleClass, MeasureCollection, Measure, Provenance, DataType, StationLatestMeasure, Station
from .traffic import TrafficSensorStation, TrafficMeasure, TrafficMeasureCollection, TrafficEntry
from .pollution import PollutionEntry, PollutantClass, PollutionMeasure, PollutionMeasureCollection
2 changes: 1 addition & 1 deletion pollution_v2/src/common/data_model/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def from_json(cls, dict_data) -> Station:


@dataclass
class StationLatestMeasure():
class StationLatestMeasure:

def __init__(self, station_code, latest_time):
self.station_code = station_code
Expand Down
Empty file.
71 changes: 71 additions & 0 deletions pollution_v2/src/common/manager/traffic_station.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# SPDX-FileCopyrightText: NOI Techpark <[email protected]>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

from __future__ import absolute_import, annotations

import itertools
import logging
from datetime import datetime
from typing import List

from common.connector.collector import ConnectorCollector
from common.data_model import TrafficMeasureCollection, TrafficSensorStation, StationLatestMeasure
from common.settings import ODH_MINIMUM_STARTING_DATE

logger = logging.getLogger("common.manager.traffic_station")


class TrafficStationManager:

def __init__(self, connector_collector: ConnectorCollector):
self._connector_collector = connector_collector
self._traffic_stations: List[TrafficSensorStation] = []

def get_traffic_stations_from_cache(self) -> List[TrafficSensorStation]:
if len(self._traffic_stations) == 0:
logger.info("Retrieving station list from ODH")
self._traffic_stations = self._get_station_list()
return self._traffic_stations

def get_all_latest_measures(self) -> List[StationLatestMeasure]:
"""
Returns a list of stations with its latest measure date.
:return: List of stations with its latest measure date.
"""
all_measures = self._connector_collector.traffic.get_latest_measures()

grouped = {}
for station_code, values in itertools.groupby(all_measures, lambda m: m.station.code):
tmp = list(values)
if len(tmp) > 0:
grouped[station_code] = tmp

res = []
for key, value in grouped.items():
res.append(StationLatestMeasure(key, max(list(map(lambda m: m.valid_time, value)),
default=ODH_MINIMUM_STARTING_DATE)))

return res

def _get_station_list(self) -> List[TrafficSensorStation]:
"""
Retrieve the list of all the available stations.
"""
return self._connector_collector.traffic.get_station_list()

def _download_traffic_data(self,
from_date: datetime,
to_date: datetime,
traffic_station: TrafficSensorStation
) -> TrafficMeasureCollection:
"""
Download traffic data measures in the given interval.
:param from_date: Traffic measures before this date are discarded if there isn't any latest pollution measure available.
:param to_date: Traffic measure after this date are discarded.
:return: The resulting TrafficMeasureCollection containing the traffic data.
"""

return TrafficMeasureCollection(measures=self._connector_collector.traffic.get_measures(from_date=from_date, to_date=to_date, station=traffic_station))
1 change: 1 addition & 0 deletions pollution_v2/src/common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ODH_COMPUTATION_BATCH_SIZE = int(Variable.get("ODH_COMPUTATION_BATCH_SIZE", 30))
ODH_MINIMUM_STARTING_DATE = dateutil.parser.parse(Variable.get("ODH_MINIMUM_STARTING_DATE", "2018-01-01"))
DAG_POLLUTION_EXECUTION_CRONTAB = Variable.get("DAG_POLLUTION_EXECUTION_CRONTAB", "0 0 * * *")
DAG_VALIDATION_EXECUTION_CRONTAB = Variable.get("DAG_VALIDATION_EXECUTION_CRONTAB", "0 0 * * *")
DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN = int(Variable.get("DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN", 24))

# General
Expand Down
Loading

0 comments on commit 57df5da

Please sign in to comment.