Skip to content

Commit

Permalink
Merge pull request #929 from keflavich/how_ray_slice_is_ok
Browse files Browse the repository at this point in the history
Regression fix: if how='slice' or 'ray', we do _not_ want to warn
  • Loading branch information
e-koch authored Jan 17, 2025
2 parents e98b6c3 + 6eeb9cc commit 410189f
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 39 deletions.
4 changes: 3 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
0.6.6.dev (unreleased)
----------------------
-
- Warning behavior changed to no longer warn about dask cubes loading into
memory. Dask 'slow' warnings are now limited to the necessarily single-
stream examples: median, percentile, and mad_std. #929

0.6.5 (2023-12-05)
----------------------
Expand Down
24 changes: 17 additions & 7 deletions spectral_cube/dask_spectral_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import inspect
import warnings
import tempfile
import textwrap

from functools import wraps
from contextlib import contextmanager
Expand All @@ -25,7 +26,7 @@

from . import wcs_utils
from .spectral_cube import SpectralCube, VaryingResolutionSpectralCube, SIGMA2FWHM, np2wcs
from .utils import cached, VarianceWarning, SliceWarning, BeamWarning, SmoothingWarning, BeamUnitsError
from .utils import cached, VarianceWarning, SliceWarning, BeamWarning, SmoothingWarning, BeamUnitsError, PossiblySlowWarning
from .lower_dimensional_structures import Projection
from .masks import BooleanArrayMask, is_broadcastable_and_smaller
from .np_compat import allbadtonan
Expand Down Expand Up @@ -67,6 +68,18 @@ def wrapper(self, *args, **kwargs):
return wrapper


def _warn_slow_dask(functionname):
"""
Dask has a different 'slow' warning than non-dask. It is only expected to
be slow for statistics that require sorting (and possibly cube arithmetic).
"""
warnings.warn(message=textwrap.dedent(f"""
Dask requires loading the whole cube into memory for {functionname}
calculations. This may result in slow computation.
""").strip(),
category=PossiblySlowWarning)


def add_save_to_tmp_dir_option(function):

@wraps(function)
Expand Down Expand Up @@ -626,7 +639,6 @@ def mean(self, axis=None, **kwargs):
return self._compute(da.nanmean(self._get_filled_data(fill=np.nan), axis=axis, **kwargs))

@projection_if_needed
@ignore_warnings
def median(self, axis=None, **kwargs):
"""
Return the median of the cube, optionally over an axis.
Expand All @@ -636,13 +648,12 @@ def median(self, axis=None, **kwargs):
if axis is None:
# da.nanmedian raises NotImplementedError since it is not possible
# to do efficiently, so we use Numpy instead.
self._warn_slow('median')
_warn_slow_dask('median')
return np.nanmedian(self._compute(data), **kwargs)
else:
return self._compute(da.nanmedian(self._get_filled_data(fill=np.nan), axis=axis, **kwargs))

@projection_if_needed
@ignore_warnings
def percentile(self, q, axis=None, **kwargs):
"""
Return percentiles of the data.
Expand All @@ -660,7 +671,7 @@ def percentile(self, q, axis=None, **kwargs):
if axis is None:
# There is no way to compute the percentile of the whole array in
# chunks.
self._warn_slow('percentile')
_warn_slow_dask('percentile')
return np.nanpercentile(data, q, **kwargs)
else:
# Rechunk so that there is only one chunk along the desired axis
Expand All @@ -683,7 +694,6 @@ def std(self, axis=None, ddof=0, **kwargs):
return self._compute(da.nanstd(self._get_filled_data(fill=np.nan), axis=axis, ddof=ddof, **kwargs))

@projection_if_needed
@ignore_warnings
def mad_std(self, axis=None, ignore_nan=True, **kwargs):
"""
Use astropy's mad_std to compute the standard deviation
Expand All @@ -694,7 +704,7 @@ def mad_std(self, axis=None, ignore_nan=True, **kwargs):
if axis is None:
# In this case we have to load the full data - even dask's
# nanmedian doesn't work efficiently over the whole array.
self._warn_slow('mad_std')
_warn_slow_dask('mad_std')
return stats.mad_std(data, ignore_nan=ignore_nan, **kwargs)
else:
# Rechunk so that there is only one chunk along the desired axis
Expand Down
14 changes: 8 additions & 6 deletions spectral_cube/io/casa_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ def load_casa_image(filename, skipdata=False, memmap=True,
elif 'beams' in beam_:
bdict = beam_['beams']

# NOTE: temp to check failing test
print(desc['_keywords_']['coords'])
print(list(desc['_keywords_']['coords'].keys()))


stokes_params = desc['_keywords_']['coords']['stokes1']['stokes']
# check if stokes1 or stokes 2 is present. if not assume stokes_params = ['I']
if 'stokes1' in desc['_keywords_']['coords']:
stokes_params = desc['_keywords_']['coords']['stokes1']['stokes']
elif 'stokes2' in desc['_keywords_']['coords']:
stokes_params = desc['_keywords_']['coords']['stokes2']['stokes']
else:
warnings.warn("No stokes information found in CASA image. Assuming Stokes I.")
stokes_params = ['I']

nbeams = len(bdict)
nchan = beam_['nChannels']
Expand Down
14 changes: 12 additions & 2 deletions spectral_cube/spectral_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,6 @@ def _apply_everywhere(self, function, *args, check_units=True):

return self._new_cube_with(data=data, unit=new_unit)

@warn_slow
def _cube_on_cube_operation(self, function, cube, equivalencies=[], **kwargs):
"""
Apply an operation between two cubes. Inherits the metadata of the
Expand Down Expand Up @@ -1438,7 +1437,11 @@ def unmasked_data(self, view):
# Astropy Quantities don't play well with dask arrays with shape ()
if isinstance(values, da.Array) and values.shape == ():
values = values.compute()
return u.Quantity(values, self.unit, copy=False)
try:
return u.Quantity(values, self.unit, copy=False)
except ValueError:
warnings.warn("The data were copied; it was not possible to create a view on the data")
return u.Quantity(values, self.unit)

def unmasked_copy(self):
"""
Expand Down Expand Up @@ -2286,6 +2289,7 @@ def __ne__(self, value):
value = self._val_to_own_unit(value)
return LazyComparisonMask(operator.ne, value, data=self._data, wcs=self._wcs)

@warn_slow
def __add__(self, value):
if isinstance(value, BaseSpectralCube):
return self._cube_on_cube_operation(operator.add, value)
Expand All @@ -2294,6 +2298,7 @@ def __add__(self, value):
keepunit=False)
return self._apply_everywhere(operator.add, value, check_units=False)

@warn_slow
def __sub__(self, value):
if isinstance(value, BaseSpectralCube):
return self._cube_on_cube_operation(operator.sub, value)
Expand All @@ -2302,21 +2307,25 @@ def __sub__(self, value):
tofrom='from', keepunit=False)
return self._apply_everywhere(operator.sub, value, check_units=False)

@warn_slow
def __mul__(self, value):
if isinstance(value, BaseSpectralCube):
return self._cube_on_cube_operation(operator.mul, value)
else:
return self._apply_everywhere(operator.mul, value)

@warn_slow
def __truediv__(self, value):
return self.__div__(value)

@warn_slow
def __div__(self, value):
if isinstance(value, BaseSpectralCube):
return self._cube_on_cube_operation(operator.truediv, value)
else:
return self._apply_everywhere(operator.truediv, value)

@warn_slow
def __floordiv__(self, value):
raise NotImplementedError("Floor-division (division with truncation) "
"is not supported.")
Expand All @@ -2338,6 +2347,7 @@ def __floordiv__(self, value):
# raise NotImplementedError("Floor-division (division with truncation) "
# "is not supported.")

@warn_slow
def __pow__(self, value):
if isinstance(value, BaseSpectralCube):
return self._cube_on_cube_operation(operator.pow, value)
Expand Down
10 changes: 7 additions & 3 deletions spectral_cube/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def test_statistics(data_adv):

def test_statistics_withnans(data_adv):
cube = DaskSpectralCube.read(data_adv).rechunk(chunks=(1, 2, 3))
# shape is 2, 3, 4
cube._data[:,:,:2] = np.nan
# shape is 4, 3, 2 for adv
cube._data[:2,:,:] = np.nan
# ensure some chunks are all nan
cube.rechunk((1,2,2))
stats = cube.statistics()
Expand Down Expand Up @@ -266,7 +266,11 @@ def test_cube_on_cube(filename, request):
# since they are not SpectralCube subclasses
cube = DaskSpectralCube.read(dataname)
assert isinstance(cube, (DaskSpectralCube, DaskVaryingResolutionSpectralCube))
cube2 = SpectralCube.read(dataname, use_dask=False)
if 'image' in filename:
# no choice - non-dask can't read casa images
cube2 = SpectralCube.read(dataname)
else:
cube2 = SpectralCube.read(dataname, use_dask=False)
if 'image' not in filename:
# 'image' would be CASA and must be dask
assert not isinstance(cube2, (DaskSpectralCube, DaskVaryingResolutionSpectralCube))
Expand Down
22 changes: 6 additions & 16 deletions spectral_cube/tests/test_spectral_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,6 @@ def cube_and_raw(filename, use_dask=None):
return c, d


def test_arithmetic_warning(data_vda_jybeam_lower, recwarn, use_dask):

cube, data = cube_and_raw(data_vda_jybeam_lower, use_dask=use_dask)

assert not cube._is_huge

# make sure the small cube raises a warning about loading into memory
with pytest.warns(UserWarning, match='requires loading the entire'):
cube + 5*cube.unit


def test_huge_disallowed(data_vda_jybeam_lower, use_dask):

cube, data = cube_and_raw(data_vda_jybeam_lower, use_dask=use_dask)
Expand All @@ -130,13 +119,13 @@ def test_huge_disallowed(data_vda_jybeam_lower, use_dask):

assert cube._is_huge

with pytest.raises(ValueError, match='entire cube into memory'):
cube + 5*cube.unit

if use_dask:
with pytest.raises(ValueError, match='entire cube into memory'):
with pytest.warns(UserWarning, match='whole cube into memory'):
cube.mad_std()
else:
with pytest.raises(ValueError, match='entire cube into memory'):
cube + 5*cube.unit

with pytest.raises(ValueError, match='entire cube into memory'):
cube.max(how='cube')

Expand Down Expand Up @@ -1536,7 +1525,8 @@ def test_oned_collapse_beams(data_sdav_beams, use_dask):
spec = cube.mean(axis=(1,2))
assert isinstance(spec, VaryingResolutionOneDSpectrum)
# data has a redundant 1st axis
np.testing.assert_equal(spec.value, data.mean(axis=(1,2,3)))
# we changed to assert_almost_equal in 2025 because, for no known reason, epsilon-level differences crept in
np.testing.assert_almost_equal(spec.value, np.nanmean(data, axis=(1,2,3)))
assert cube.unit == spec.unit
assert spec.header['BUNIT'] == cube.header['BUNIT']

Expand Down
10 changes: 6 additions & 4 deletions spectral_cube/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ def wrapper(self, *args, **kwargs):
accepts_how_keyword = 'how' in argspec.args or argspec.varkw == 'how'

warn_how = accepts_how_keyword and ((kwargs.get('how') == 'cube') or 'how' not in kwargs)

if self._is_huge and not self.allow_huge_operations:

loads_whole_cube = not (kwargs.get('how') in ('slice', 'ray'))

if self._is_huge and not self.allow_huge_operations and loads_whole_cube:
warn_message = ("This function ({0}) requires loading the entire "
"cube into memory, and the cube is large ({1} "
"pixels), so by default we disable this operation. "
Expand All @@ -50,11 +52,11 @@ def wrapper(self, *args, **kwargs):
warn_message += ("Alternatively, you may want to consider using an "
"approach that does not load the whole cube into "
"memory by specifying how='slice' or how='ray'. ")

warn_message += ("See {bigdataurl} for details.".format(bigdataurl=bigdataurl))

raise ValueError(warn_message)
elif warn_how and not self._is_huge:
elif warn_how and not self._is_huge and loads_whole_cube:
# TODO: add check for whether cube has been loaded into memory
warnings.warn("This function ({0}) requires loading the entire cube into "
"memory and may therefore be slow.".format(str(function)),
Expand Down

0 comments on commit 410189f

Please sign in to comment.