From 9b77e5599aa48578a5167463881aa47a2b79f9ec Mon Sep 17 00:00:00 2001 From: Fred Thomas Date: Thu, 20 Jul 2023 16:30:16 +0100 Subject: [PATCH 1/2] change serialisation of empty slices (of nodes or edges) change from completely empty dataframe to empty geodataframe (no rows) but with a minimal column set (geometry). this is necessary as geopandas/pyarrow behaviour on opening empty geodataframe has changed. --- src/open_gira/io.py | 1 - workflow/scripts/concat_and_sum_slices.py | 16 +++------- workflow/scripts/osm_to_pq.py | 32 ++++++++++++------- .../scripts/transport/create_rail_network.py | 30 +++++------------ .../scripts/transport/create_road_network.py | 18 ++++------- 5 files changed, 39 insertions(+), 58 deletions(-) diff --git a/src/open_gira/io.py b/src/open_gira/io.py index d1afff5a..ad2f57e1 100644 --- a/src/open_gira/io.py +++ b/src/open_gira/io.py @@ -15,7 +15,6 @@ from open_gira.utils import natural_sort -NO_GEOM_ERROR_MSG: str = "No geometry columns are included in the columns" WGS84_EPSG = 4326 # column names and dtypes for STORM synthetic tropical cyclone tracks diff --git a/workflow/scripts/concat_and_sum_slices.py b/workflow/scripts/concat_and_sum_slices.py index 46cbf4dd..dc805105 100644 --- a/workflow/scripts/concat_and_sum_slices.py +++ b/workflow/scripts/concat_and_sum_slices.py @@ -15,7 +15,6 @@ import pandas as pd from tqdm import tqdm -from open_gira.io import NO_GEOM_ERROR_MSG from open_gira.utils import natural_sort @@ -38,18 +37,11 @@ dataframes: list[gpd.GeoDataFrame] = [] for i, slice_path in tqdm(enumerate(slice_files)): - try: - gdf = gpd.read_parquet(slice_path) + gdf = gpd.read_parquet(slice_path) - except ValueError as error: - if NO_GEOM_ERROR_MSG in str(error): - # if the input parquet file does not contain a geometry column, - # geopandas will raise a ValueError rather than try to procede. we - # catch that here, but check the error message - to be more - # specific than catching and suppressing any ValueError - - # use an empty geodataframe to append instead - gdf = gpd.GeoDataFrame([]) + if gdf.empty is True: + # use an empty geodataframe to append instead + gdf = gpd.GeoDataFrame([], columns=["geometry"]) dataframes.append(gdf) diff --git a/workflow/scripts/osm_to_pq.py b/workflow/scripts/osm_to_pq.py index 92918691..8045025e 100644 --- a/workflow/scripts/osm_to_pq.py +++ b/workflow/scripts/osm_to_pq.py @@ -201,22 +201,28 @@ def get_node_by_coords(self, coords, prefix, node_list): } +def empty_gdf() -> geopandas.GeoDataFrame: + """ + Create an return an empty GeoDataFrame. Must explicitly specify columns + (despite empty list) to permit saving as geoparquet. + """ + return geopandas.GeoDataFrame([], columns=["geometry"]) + + if __name__ == "__main__": try: pbf_path = snakemake.input["pbf"] # type: ignore edges_path = snakemake.output["edges"] # type: ignore nodes_path = snakemake.output["nodes"] # type: ignore keep_tags = snakemake.params["keep_tags"] # type: ignore - osm_epsg = snakemake.config["osm_epsg"] # type: ignore except NameError: # If "snakemake" doesn't exist then must be running from the # command line. - pbf_path, edges_path, nodes_path, keep_tags, osm_epsg = sys.argv[1:] + pbf_path, edges_path, nodes_path, keep_tags = sys.argv[1:] # pbf_path = 'results/slices/tanzania-mini_filter-road/slice-2.osm.pbf' # edges_path = 'results/slice-2.geoparquet' # nodes_path = 'results/slice-2.geoparquet' # keep_tags = 'highway, railway' - # osm_epsg = 4326 # process comma separated string into list of strings keep_tags: list = keep_tags.replace(" ", "").split(",") @@ -243,7 +249,12 @@ def get_node_by_coords(self, coords, prefix, node_list): tags_to_preserve=keep_tags, ) h.apply_file(pbf_path, locations=True) - edges = geopandas.GeoDataFrame(h.output_data) + + if len(h.output_data) != 0: + edges = geopandas.GeoDataFrame(h.output_data) + edges = edges.set_crs(epsg=4326) + else: + edges = empty_gdf() logging.info( f"Complete: {len(h.output_data)} segments from {len(Counter(w['osm_way_id'] for w in h.output_data))} ways." ) @@ -252,14 +263,13 @@ def get_node_by_coords(self, coords, prefix, node_list): tags_to_preserve=keep_tags, ) n.apply_file(pbf_path, locations=True) - nodes = geopandas.GeoDataFrame(n.output_data) - logging.info(f"Complete: {len(n.output_data)} nodes.") + if len(n.output_data) != 0: + nodes = geopandas.GeoDataFrame(n.output_data) + nodes = nodes.set_crs(epsg=4326) + else: + nodes = empty_gdf() - # can't set a CRS on an empty dataframe, will AttributeError - if not edges.empty: - edges.set_crs(epsg=osm_epsg, inplace=True) - if not nodes.empty: - nodes.set_crs(epsg=osm_epsg, inplace=True) + logging.info(f"Complete: {len(n.output_data)} nodes.") # write to disk -- even if empty edges.to_parquet(edges_path) diff --git a/workflow/scripts/transport/create_rail_network.py b/workflow/scripts/transport/create_rail_network.py index 1c352535..e60f7806 100644 --- a/workflow/scripts/transport/create_rail_network.py +++ b/workflow/scripts/transport/create_rail_network.py @@ -14,7 +14,7 @@ from utils import annotate_country, get_administrative_data from open_gira.assets import RailAssets -from open_gira.io import write_empty_frames, NO_GEOM_ERROR_MSG +from open_gira.io import write_empty_frames from open_gira.network import create_network from open_gira.utils import str_to_bool @@ -98,29 +98,15 @@ def get_rehab_costs(row: pd.Series, rehab_costs: pd.DataFrame) -> float: warnings.filterwarnings("ignore", message=".*initial implementation of Parquet.*") # read edges - try: - edges = gpd.read_parquet(osm_edges_path) - except ValueError as error: - if NO_GEOM_ERROR_MSG in str(error): - logging.info("No data in geometry column, writing empty files.") - # if the input parquet file does not contain a geometry column, geopandas - # will raise a ValueError rather than try to procede - write_empty_frames(edges_output_path, nodes_output_path) - sys.exit(0) # exit gracefully so snakemake will continue - else: - raise error + edges = gpd.read_parquet(osm_edges_path) + if edges.empty is True: + write_empty_frames(edges_output_path, nodes_output_path) + sys.exit(0) # exit gracefully so snakemake will continue # read nodes - try: - nodes = gpd.read_parquet(osm_nodes_path) - except ValueError as error: - if NO_GEOM_ERROR_MSG in str(error): - logging.info(f"No nodes from OSM to process") - # if the input parquet file does not contain a geometry column, geopandas - # will raise a ValueError rather than try to procede - nodes = None - else: - raise error + nodes = gpd.read_parquet(osm_nodes_path) + if nodes.empty is True: + nodes = None # osm_to_pq.py creates these columns but we're not using them, so discard edges = edges.drop( diff --git a/workflow/scripts/transport/create_road_network.py b/workflow/scripts/transport/create_road_network.py index 5bd3f06c..3c72d286 100644 --- a/workflow/scripts/transport/create_road_network.py +++ b/workflow/scripts/transport/create_road_network.py @@ -16,7 +16,7 @@ from utils import annotate_country, cast, get_administrative_data, strip_suffix from open_gira.assets import RoadAssets -from open_gira.io import write_empty_frames, NO_GEOM_ERROR_MSG +from open_gira.io import write_empty_frames from open_gira.network import create_network from open_gira.utils import str_to_bool @@ -412,17 +412,11 @@ def get_rehab_costs(row: pd.Series, rehab_costs: pd.DataFrame) -> float: # NB though that .geoparquet is not the format to use for archiving. warnings.filterwarnings("ignore", message=".*initial implementation of Parquet.*") - try: - edges = gpd.read_parquet(osm_edges_path) - except ValueError as error: - if NO_GEOM_ERROR_MSG in str(error): - logging.info("No data in geometry column, writing empty files.") - # if the input parquet file does not contain a geometry column, geopandas - # will raise a ValueError rather than try to procede - write_empty_frames(edges_output_path, nodes_output_path) - sys.exit(0) # exit gracefully so snakemake will continue - else: - raise error + edges = gpd.read_parquet(osm_edges_path) + + if edges.empty is True: + write_empty_frames(edges_output_path, nodes_output_path) + sys.exit(0) # exit gracefully so snakemake will continue # osm_to_pq.py creates these columns but we're not using them, so discard edges = edges.drop( From 8139a5ec6cdf394cba7fc74dfa6ca1038154c80d Mon Sep 17 00:00:00 2001 From: Fred Thomas Date: Thu, 20 Jul 2023 16:37:03 +0100 Subject: [PATCH 2/2] add slice count as param, triggers rerun on change --- workflow/rules/preprocess/create_bbox_extracts.smk | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/workflow/rules/preprocess/create_bbox_extracts.smk b/workflow/rules/preprocess/create_bbox_extracts.smk index b2d74de5..28c21c99 100644 --- a/workflow/rules/preprocess/create_bbox_extracts.smk +++ b/workflow/rules/preprocess/create_bbox_extracts.smk @@ -3,6 +3,10 @@ rule create_bbox_extracts: conda: "../../../environment.yml" input: "{OUTPUT_DIR}/json/{DATASET}.json", + params: + # include slice_count as a param (despite not using elsewhere in the + # rule) to trigger re-runs on change to this configuration option + slice_count = config["slice_count"] output: # double curly braces allow us to expand but keep wildcards! expand(