Skip to content

Commit

Permalink
Merge pull request #450 from astrofrog/logging
Browse files Browse the repository at this point in the history
Add logging calls and fix a couple of dask-related issues
  • Loading branch information
astrofrog committed Jun 27, 2024
2 parents f684098 + 2ab5757 commit 51379b1
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 3 deletions.
44 changes: 41 additions & 3 deletions reproject/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import tempfile
import uuid
Expand All @@ -15,10 +16,29 @@
__all__ = ["_reproject_dispatcher"]


class _ArrayContainer:
# When we set up as_delayed_memmap_path, if we pass a dask array to it,
# dask will actually compute the array before we get to the code inside
# as_delayed_memmap_path, so as a workaround we wrap any array we
# pass in using _ArrayContainer to make sure dask doesn't try and be smart.
def __init__(self, array):
self._array = array


@delayed(pure=True)
def as_delayed_memmap_path(array, tmp_dir):

# Extract array from _ArrayContainer
if isinstance(array, _ArrayContainer):
array = array._array
else:
raise TypeError("Expected _ArrayContainer in as_delayed_memmap_path")

logger = logging.getLogger(__name__)
if isinstance(array, da.core.Array):
logger.info("Computing input dask array to Numpy memory-mapped array")
array_path, _ = _dask_to_numpy_memmap(array, tmp_dir)
logger.info(f"Numpy memory-mapped array is now at {array_path}")
else:
array_path = os.path.join(tmp_dir, f"{uuid.uuid4()}.npy")
array_memmapped = np.memmap(
Expand Down Expand Up @@ -95,6 +115,8 @@ def _reproject_dispatcher(
Whether to return numpy or dask arrays - defaults to 'numpy'.
"""

logger = logging.getLogger(__name__)

if return_type is None:
return_type = "numpy"
elif return_type not in ("numpy", "dask"):
Expand Down Expand Up @@ -138,7 +160,11 @@ def _reproject_dispatcher(
)

if isinstance(array_in, da.core.Array):
_, array_in = _dask_to_numpy_memmap(array_in, local_tmp_dir)
logger.info("Computing input dask array to Numpy memory-mapped array")
array_path, array_in = _dask_to_numpy_memmap(array_in, local_tmp_dir)
logger.info(f"Numpy memory-mapped array is now at {array_path}")

logger.info(f"Calling {reproject_func.__name__} in non-dask mode")

try:
return reproject_func(
Expand Down Expand Up @@ -180,7 +206,7 @@ def _reproject_dispatcher(
tmp_dir = tempfile.mkdtemp()
else:
tmp_dir = local_tmp_dir
array_in_or_path = as_delayed_memmap_path(array_in, tmp_dir)
array_in_or_path = as_delayed_memmap_path(_ArrayContainer(array_in), tmp_dir)
else:
# Here we could set array_in_or_path to array_in_path if it has
# been set previously, but in synchronous and threaded mode it is
Expand All @@ -190,7 +216,13 @@ def _reproject_dispatcher(
array_in_or_path = array_in

def reproject_single_block(a, array_or_path, block_info=None):
if a.ndim == 0 or block_info is None or block_info == []:

if (
a.ndim == 0
or block_info is None
or block_info == []
or (isinstance(block_info, np.ndarray) and block_info.tolist() == [])
):
return np.array([a, a])

# The WCS class from astropy is not thread-safe, see e.g.
Expand Down Expand Up @@ -262,6 +294,8 @@ def reproject_single_block(a, array_or_path, block_info=None):
array_out_dask = da.empty(shape_out)
array_out_dask = array_out_dask.rechunk(block_size_limit=64 * 1024**2, **rechunk_kwargs)

logger.info(f"Setting up output dask array with map_blocks")

result = da.map_blocks(
reproject_single_block,
array_out_dask,
Expand Down Expand Up @@ -297,6 +331,8 @@ def reproject_single_block(a, array_or_path, block_info=None):

zarr_path = os.path.join(local_tmp_dir, f"{uuid.uuid4()}.zarr")

logger.info(f"Computing output array directly to zarr array at {zarr_path}")

if parallel == "current-scheduler":
# Just use whatever is the current active scheduler, which can
# be used for e.g. dask.distributed
Expand All @@ -317,6 +353,8 @@ def reproject_single_block(a, array_or_path, block_info=None):

result = da.from_zarr(zarr_path)

logger.info(f"Copying output zarr array into output Numpy arrays")

if return_footprint:
da.store(
[result[0], result[1]],
Expand Down
43 changes: 43 additions & 0 deletions reproject/mosaicking/coadd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import tempfile
import uuid
from logging import getLogger

import numpy as np
from astropy.wcs import WCS
Expand Down Expand Up @@ -143,6 +144,8 @@ def reproject_and_coadd(
# up memory usage. We could probably still have references to array
# objects, but we'd just make sure these were memory mapped

logger = getLogger(__name__)

# Validate inputs

if combine_function not in ("mean", "sum", "first", "last", "min", "max"):
Expand Down Expand Up @@ -176,12 +179,19 @@ def reproject_and_coadd(
f"the output shape {shape_out}"
)

logger.info(f"Output mosaic will have shape {shape_out}")

# Define 'on-the-fly' mode: in the case where we don't need to match
# the backgrounds and we are combining with 'mean' or 'sum', we don't
# have to keep track of the intermediate arrays and can just modify
# the output array on-the-fly
on_the_fly = not match_background and combine_function in ("mean", "sum")

on_the_fly_prefix = "Using" if on_the_fly else "Not using"
logger.info(
f"{on_the_fly_prefix} on-the-fly mode for adding individual reprojected images to output array"
)

# Start off by reprojecting individual images to the final projection

if not on_the_fly:
Expand All @@ -190,6 +200,9 @@ def reproject_and_coadd(
with tempfile.TemporaryDirectory(ignore_cleanup_errors=IS_WIN) as local_tmp_dir:

for idata in progress_bar(range(len(input_data))):

logger.info(f"Processing input data {idata + 1} of {len(input_data)}")

# We need to pre-parse the data here since we need to figure out how to
# optimize/minimize the size of each output tile (see below).
array_in, wcs_in = parse_input_data(input_data[idata], hdu_in=hdu_in)
Expand Down Expand Up @@ -238,6 +251,9 @@ def reproject_and_coadd(
break

if skip_data:
logger.info(
"Skipping reprojection as no predicted overlap with final mosaic header"
)
continue

slice_out = tuple([slice(imin, imax) for (imin, imax) in bounds])
Expand Down Expand Up @@ -265,6 +281,10 @@ def reproject_and_coadd(

array_path = os.path.join(local_tmp_dir, f"array_{uuid.uuid4()}.np")

logger.info(
f"Creating memory-mapped array with shape {shape_out_indiv} at {array_path}"
)

array = np.memmap(
array_path,
shape=shape_out_indiv,
Expand All @@ -274,6 +294,10 @@ def reproject_and_coadd(

footprint_path = os.path.join(local_tmp_dir, f"footprint_{uuid.uuid4()}.np")

logger.info(
f"Creating memory-mapped footprint with shape {shape_out_indiv} at {footprint_path}"
)

footprint = np.memmap(
footprint_path,
shape=shape_out_indiv,
Expand All @@ -285,6 +309,8 @@ def reproject_and_coadd(

array = footprint = None

logger.info(f"Calling {reproject_function.__name__} with shape_out={shape_out_indiv}")

array, footprint = reproject_function(
(array_in, wcs_in),
output_projection=wcs_out_indiv,
Expand All @@ -301,6 +327,10 @@ def reproject_and_coadd(

weights_path = os.path.join(local_tmp_dir, f"weights_{uuid.uuid4()}.np")

logger.info(
f"Creating memory-mapped weights with shape {shape_out_indiv} at {weights_path}"
)

weights = np.memmap(
weights_path,
shape=shape_out_indiv,
Expand All @@ -312,6 +342,10 @@ def reproject_and_coadd(

weights = None

logger.info(
f"Calling {reproject_function.__name__} with shape_out={shape_out_indiv} for weights"
)

weights = reproject_function(
(weights_in, wcs_in),
output_projection=wcs_out_indiv,
Expand All @@ -335,6 +369,7 @@ def reproject_and_coadd(

if intermediate_memmap:
# Remove the reference to the memmap before trying to remove the file itself
logger.info(f"Removing memory-mapped weight array")
weights = None
try:
os.remove(weights_path)
Expand All @@ -347,6 +382,7 @@ def reproject_and_coadd(
# output image is empty (due e.g. to no overlap).

if on_the_fly:
logger.info(f"Adding reprojected array to final array")
# By default, values outside of the footprint are set to NaN
# but we set these to 0 here to avoid getting NaNs in the
# means/sums.
Expand All @@ -362,6 +398,7 @@ def reproject_and_coadd(
if intermediate_memmap:
# Remove the references to the memmaps themesleves before
# trying to remove the files thermselves.
logger.info(f"Removing memory-mapped array and footprint arrays")
array = None
footprint = None
try:
Expand All @@ -371,10 +408,12 @@ def reproject_and_coadd(
pass

else:
logger.info(f"Adding reprojected array to list to combine later")
arrays.append(array)

# If requested, try and match the backgrounds.
if match_background and len(arrays) > 1:
logger.info(f"Match backgrounds")
offset_matrix = determine_offset_matrix(arrays)
corrections = solve_corrections_sgd(offset_matrix)
if background_reference:
Expand All @@ -384,6 +423,7 @@ def reproject_and_coadd(

if combine_function in ("mean", "sum"):
if match_background:
logger.info(f"Combining reprojected arrays with function {combine_function}")
# if we're not matching the background, this part has already been done
for array in arrays:
# By default, values outside of the footprint are set to NaN
Expand All @@ -394,10 +434,12 @@ def reproject_and_coadd(
output_footprint[array.view_in_original_array] += array.footprint

if combine_function == "mean":
logger.info(f"Handle normalization of output array")
with np.errstate(invalid="ignore"):
output_array /= output_footprint

elif combine_function in ("first", "last", "min", "max"):
logger.info(f"Combining reprojected arrays with function {combine_function}")
if combine_function == "min":
output_array[...] = np.inf
elif combine_function == "max":
Expand Down Expand Up @@ -433,6 +475,7 @@ def reproject_and_coadd(

# We need to avoid potentially large memory allocation from output == 0 so
# we operate in chunks.
logger.info(f"Resetting invalid pixels to {blank_pixel_value}")
for chunk in iterate_chunks(output_array.shape, max_chunk_size=256 * 1024**2):
output_array[chunk][output_footprint[chunk] == 0] = blank_pixel_value

Expand Down

0 comments on commit 51379b1

Please sign in to comment.