Skip to content

Commit

Permalink
Use ArraySliceDep from dask
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 committed Feb 1, 2022
1 parent ab51354 commit 883c8fa
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 40 deletions.
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ Sphinx = {version = "^3.5.4", optional = true}
aiohttp = {version = "^3.7.4", optional = true}
cachetools = {version = "^4.2.2", optional = true}
coiled = {version = "^0", optional = true}
dask = {extras = ["array"], version = ">= 2021.4.1, < 2023"}
distributed = {version = ">= 2021.4.1, < 2023", optional = true}
dask = {extras = ["array"], version = "^2022.1.1"}
distributed = {version = "^2022.1.1", optional = true}
furo = {version = "^2021.4.11-beta.34", optional = true}
geogif = {version = "^0", optional = true}
ipyleaflet = {version = "^0.13.6", optional = true}
Expand Down
42 changes: 5 additions & 37 deletions stackstac/to_dask.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from __future__ import annotations

import itertools
from typing import ClassVar, Dict, Literal, Optional, Tuple, Type, Union
from typing import Dict, Literal, Optional, Tuple, Type, Union
import warnings

from affine import Affine
import dask
import dask.array as da
from dask.blockwise import BlockwiseDep, blockwise
from dask.blockwise import blockwise
from dask.highlevelgraph import HighLevelGraph
from dask.layers import ArraySliceDep
import numpy as np
from rasterio import windows
from rasterio.enums import Resampling
Expand Down Expand Up @@ -48,7 +48,7 @@ def items_to_dask(
# The overall strategy in this function is to materialize the outer two dimensions (items, assets)
# as one dask array (the "asset table"), then map a function over it which opens each URL as a `Reader`
# instance (the "reader table").
# Then, we use the `Slices` `BlockwiseDep` to represent the inner inner two dimensions (y, x), and
# Then, we use the `ArraySliceDep` `BlockwiseDep` to represent the inner inner two dimensions (y, x), and
# `Blockwise` to create the cartesian product between them, avoiding materializing that entire graph.
# Materializing the (items, assets) dimensions is unavoidable: every asset has a distinct URL, so that information
# has to be included somehow.
Expand Down Expand Up @@ -91,7 +91,7 @@ def items_to_dask(
"tbyx",
reader_table.name,
"tb",
Slices(chunks_yx),
ArraySliceDep(chunks_yx),
"yx",
dtype,
None,
Expand Down Expand Up @@ -254,35 +254,3 @@ def window_from_bounds(bounds: Bbox, transform: Affine) -> windows.Window:
),
)
return window


# FIXME: Get this from Dask once https://github.com/dask/dask/pull/7417 is merged!
# The scheduler will refuse to import it without passlisting stackstac in `distributed.scheduler.allowed-imports`.
class Slices(BlockwiseDep):
"Produces the slice into the full-size array corresponding to the current chunk"

starts: list[Tuple[int, ...]]
produces_tasks: ClassVar[bool] = False

def __init__(self, chunks: Tuple[Tuple[int, ...], ...]):
self.starts = [tuple(itertools.accumulate(c, initial=0)) for c in chunks]

def __getitem__(self, idx: Tuple[int, ...]) -> Tuple[slice, ...]:
return tuple(
slice(start[i], start[i + 1]) for i, start in zip(idx, self.starts)
)

@property
def numblocks(self) -> list[int]:
return [len(s) - 1 for s in self.starts]

def __dask_distributed_pack__(
self, required_indices: Optional[list[Tuple[int, ...]]] = None
) -> list[Tuple[int, ...]]:
return self.starts

@classmethod
def __dask_distributed_unpack__(cls, state: list[Tuple[int, ...]]) -> Slices:
self = cls.__new__(cls)
self.starts = state
return self

0 comments on commit 883c8fa

Please sign in to comment.