Skip to content

Commit

Permalink
Add seed parameter to stochastic functions and support passing seed to
Browse files Browse the repository at this point in the history
  `MapReduceJob`, `PowersetSampler` and `concurrent.futures.Executor` based methods.
  • Loading branch information
Markus Semmler committed Aug 12, 2023
1 parent e42c304 commit 116f62a
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 40 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
[PR #382](https://github.com/appliedAI-Initiative/pyDVL/pull/382)
- Decouple ray.init from ParallelConfig
[PR #373](https://github.com/appliedAI-Initiative/pyDVL/pull/383)
- Add seed parameter Shapley algorithms and support passing `seed` to
`MapReduceJob`, `PowersetSampler` and `concurrent.futures.Executor` based methods.
[PR #396](https://github.com/appliedAI-Initiative/pyDVL/pull/396)

## 0.6.1 - 🏗 Bug fixes and small improvement

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pyDeprecate>=0.3.2
numpy>=1.20
numpy>=1.24
pandas>=1.3
scikit-learn
scipy>=1.7.0
Expand Down
34 changes: 27 additions & 7 deletions src/pydvl/utils/numeric.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
from __future__ import annotations

import numbers
from itertools import chain, combinations
from typing import Collection, Generator, Iterator, Optional, Tuple, TypeVar, overload

Expand All @@ -21,6 +22,8 @@
"top_k_value_accuracy",
]

from pydvl.utils.types import SeedOrGenerator

T = TypeVar("T", bound=np.generic)


Expand Down Expand Up @@ -59,21 +62,29 @@ def num_samples_permutation_hoeffding(eps: float, delta: float, u_range: float)
return int(np.ceil(np.log(2 / delta) * 2 * u_range**2 / eps**2))


def random_subset(s: NDArray[T], q: float = 0.5) -> NDArray[T]:
def random_subset(
s: NDArray[T],
q: float = 0.5,
seed: SeedOrGenerator = None,
) -> NDArray[T]:
"""Returns one subset at random from ``s``.
:param s: set to sample from
:param q: Sampling probability for elements. The default 0.5 yields a
uniform distribution over the power set of s.
:param seed: Seed for the random number generator.
:return: the subset
"""
rng = np.random.default_rng()
rng = np.random.default_rng(seed)
selection = rng.uniform(size=len(s)) > q
return s[selection]


def random_powerset(
s: NDArray[T], n_samples: Optional[int] = None, q: float = 0.5
s: NDArray[T],
n_samples: Optional[int] = None,
q: float = 0.5,
seed: SeedOrGenerator = None,
) -> Generator[NDArray[T], None, None]:
"""Samples subsets from the power set of the argument, without
pre-generating all subsets and in no order.
Expand All @@ -91,6 +102,7 @@ def random_powerset(
Defaults to `np.iinfo(np.int32).max`
:param q: Sampling probability for elements. The default 0.5 yields a
uniform distribution over the power set of s.
:param seed: Seed for the random number generator.
:return: Samples from the power set of s
:raises: TypeError: if the data `s` is not a NumPy array
Expand All @@ -106,26 +118,34 @@ def random_powerset(
if n_samples is None:
n_samples = np.iinfo(np.int32).max
while total <= n_samples:
yield random_subset(s, q)
yield random_subset(s, q, seed=seed)
total += 1


def random_subset_of_size(s: NDArray[T], size: int) -> NDArray[T]:
def random_subset_of_size(
s: NDArray[T],
size: int,
seed: SeedOrGenerator = None,
) -> NDArray[T]:
"""Samples a random subset of given size uniformly from the powerset
of ``s``.
:param s: Set to sample from
:param size: Size of the subset to generate
:param seed: Seed for the random number generator.
:return: The subset
:raises ValueError: If size > len(s)
"""
if size > len(s):
raise ValueError("Cannot sample subset larger than set")
rng = np.random.default_rng()
rng = np.random.default_rng(seed)
return rng.choice(s, size=size, replace=False)


def random_matrix_with_condition_number(n: int, condition_number: float) -> NDArray:
def random_matrix_with_condition_number(
n: int,
condition_number: float,
) -> NDArray:
"""Constructs a square matrix with a given condition number.
Taken from:
Expand Down
24 changes: 21 additions & 3 deletions src/pydvl/utils/parallel/map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import numbers
from itertools import accumulate, repeat
from typing import Any, Collection, Dict, Generic, List, Optional, TypeVar, Union

from joblib import Parallel, delayed
from numpy.random import SeedSequence
from numpy.typing import NDArray

from ..config import ParallelConfig
from ..types import MapFunction, ReduceFunction, maybe_add_argument
from ..types import MapFunction, ReduceFunction, Seed, check_seed, maybe_add_argument
from .backend import init_parallel_backend

__all__ = ["MapReduceJob"]
Expand Down Expand Up @@ -37,6 +39,8 @@ class MapReduceJob(Generic[T, R]):
:param config: Instance of :class:`~pydvl.utils.config.ParallelConfig`
with cluster address, number of cpus, etc.
:param n_jobs: Number of parallel jobs to run. Does not accept 0
:param seed_sequence: Seed sequence for spawning seeds to the subprocesses. If None
is passed no seed parameter will be passed to the map function.
:Examples:
Expand Down Expand Up @@ -78,6 +82,7 @@ def __init__(
*,
n_jobs: int = -1,
timeout: Optional[float] = None,
seed: Seed = None,
):
self.config = config
parallel_backend = init_parallel_backend(self.config)
Expand All @@ -95,6 +100,7 @@ def __init__(

self._map_func = maybe_add_argument(map_func, "job_id")
self._reduce_func = reduce_func
self._seed = check_seed(seed)

def __call__(
self,
Expand All @@ -108,10 +114,22 @@ def __call__(
verbose = 50 - self.config.logging_level
with Parallel(backend=backend, n_jobs=self.n_jobs, verbose=verbose) as parallel:
chunks = self._chunkify(self.inputs_, n_chunks=self.n_jobs)

# Allow functions which don't accept or need a seed parameter.
lst_add_kwargs: List[Dict[str, Union[int, SeedSequence]]] = [
{"job_id": j} for j in range(len(chunks))
]
if self._seed is not None:
lst_add_kwargs = [
{**d, **{"seed": seed}}
for d, seed in zip(lst_add_kwargs, self._seed.spawn(len(chunks)))
]

map_results: List[R] = parallel(
delayed(self._map_func)(next_chunk, job_id=j, **self.map_kwargs)
for j, next_chunk in enumerate(chunks)
delayed(self._map_func)(next_chunk, **add_kwargs, **self.map_kwargs)
for next_chunk, add_kwargs in zip(chunks, lst_add_kwargs)
)

reduce_results: R = self._reduce_func(map_results, **self.reduce_kwargs)
return reduce_results

Expand Down
34 changes: 32 additions & 2 deletions src/pydvl/utils/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@
transformations. Some of it probably belongs elsewhere.
"""
import inspect
from typing import Any, Callable, Protocol, TypeVar
import numbers
from typing import Any, Callable, Optional, Protocol, TypeVar, Union

import numpy as np
from numpy.random import SeedSequence
from numpy.typing import NDArray

__all__ = ["SupervisedModel", "MapFunction", "ReduceFunction"]
__all__ = [
"SupervisedModel",
"MapFunction",
"ReduceFunction",
"Seed",
"SeedOrGenerator",
"check_seed",
]

R = TypeVar("R", covariant=True)

Expand Down Expand Up @@ -64,3 +74,23 @@ def wrapper(*args, **kwargs):
return fun(*args, **kwargs)

return wrapper


Seed = Optional[Union[int, np.random.SeedSequence]]
SeedOrGenerator = Union[Seed, np.random.Generator]


def check_seed(seed: Seed, return_none: bool = True) -> Optional[SeedSequence]:
"""Check if the seed is valid and return a SeedSequence object if it is. If it is
not valid, return None."""

if seed is None:
if return_none:
return None
else:
return SeedSequence()

elif isinstance(seed, int):
return SeedSequence(seed)
else:
return seed
13 changes: 10 additions & 3 deletions src/pydvl/value/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import collections.abc
import logging
import numbers
from dataclasses import dataclass
from functools import total_ordering
from numbers import Integral
Expand All @@ -52,7 +53,6 @@
Literal,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
cast,
Expand All @@ -66,6 +66,7 @@
from pydvl.utils.dataset import Dataset
from pydvl.utils.numeric import running_moments
from pydvl.utils.status import Status
from pydvl.utils.types import SeedOrGenerator

try:
import pandas # Try to import here for the benefit of mypy
Expand Down Expand Up @@ -657,7 +658,11 @@ def to_dataframe(

@classmethod
def from_random(
cls, size: int, total: Optional[float] = None, **kwargs
cls,
size: int,
total: Optional[float] = None,
seed: SeedOrGenerator = None,
**kwargs,
) -> "ValuationResult":
"""Creates a :class:`ValuationResult` object and fills it with an array
of random values from a uniform distribution in [-1,1]. The values can
Expand All @@ -666,6 +671,7 @@ def from_random(
:param size: Number of values to generate
:param total: If set, the values are normalized to sum to this number
("efficiency" property of Shapley values).
:param seed: Seed for the random number generator.
:param kwargs: Additional options to pass to the constructor of
:class:`ValuationResult`. Use to override status, names, etc.
:return: A valuation result with its status set to
Expand All @@ -678,7 +684,8 @@ def from_random(
if size < 1:
raise ValueError("Size must be a positive integer")

values = np.random.uniform(low=-1, high=1, size=size)
rng = np.random.default_rng(seed)
values = rng.uniform(low=-1, high=1, size=size)
if total is not None:
values *= total / np.sum(values)

Expand Down
27 changes: 22 additions & 5 deletions src/pydvl/value/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,23 @@
:class:`UniformSampler`. In contrast, slicing a :class:`PermutationSampler`
creates a new sampler which iterates over the same indices.
"""

from __future__ import annotations

import abc
import math
import numbers
from enum import Enum
from itertools import permutations
from typing import Generic, Iterable, Iterator, Sequence, Tuple, TypeVar, overload
from typing import (
Generic,
Iterable,
Iterator,
Optional,
Sequence,
Tuple,
TypeVar,
overload,
)

import numpy as np
from numpy.typing import NDArray
Expand All @@ -50,6 +59,8 @@
"UniformSampler",
]

from pydvl.utils.types import SeedOrGenerator

T = TypeVar("T", bound=np.generic)
SampleType = Tuple[T, NDArray[T]]
Sequence.register(np.ndarray)
Expand Down Expand Up @@ -90,6 +101,7 @@ def __init__(
indices: NDArray[T],
index_iteration: IndexIteration = IndexIteration.Sequential,
outer_indices: NDArray[T] = None,
seed: SeedOrGenerator = None,
):
"""
:param indices: The set of items (indices) to sample from.
Expand All @@ -98,12 +110,14 @@ def __init__(
when sampling. Subsets are taken from the complement of each index
in succession. For embarrassingly parallel computations, this set
is sliced and the samplers are used to iterate over the slices.
:param seed: Seed for the random number generator.
"""
self._indices = indices
self._index_iteration = index_iteration
self._outer_indices = outer_indices if outer_indices is not None else indices
self._n = len(indices)
self._n_samples = 0
self._rng = np.random.default_rng(seed)

@property
def indices(self) -> NDArray[T]:
Expand Down Expand Up @@ -135,7 +149,10 @@ def iterindices(self) -> Iterator[T]:
yield idx
elif self._index_iteration is PowersetSampler.IndexIteration.Random:
while True:
yield np.random.choice(self._outer_indices, size=1).item()
yield self._rng.choice(self._outer_indices, size=1).item()

def seed(self, seed: SeedOrGenerator = None):
self._rng = np.random.default_rng(seed)

@overload
def __getitem__(self, key: slice) -> "PowersetSampler[T]":
Expand Down Expand Up @@ -255,7 +272,7 @@ class PermutationSampler(PowersetSampler[T]):

def __iter__(self) -> Iterator[SampleType]:
while True:
permutation = np.random.permutation(self._indices)
permutation = self._rng.permutation(self._indices)
for i, idx in enumerate(permutation):
yield idx, permutation[:i]
self._n_samples += 1
Expand Down Expand Up @@ -300,7 +317,7 @@ class RandomHierarchicalSampler(PowersetSampler[T]):
def __iter__(self) -> Iterator[SampleType]:
while True:
for idx in self.iterindices():
k = np.random.choice(np.arange(len(self._indices)), size=1).item()
k = self._rng.choice(np.arange(len(self._indices)), size=1).item()
subset = random_subset_of_size(self.complement([idx]), size=k)
yield idx, subset
self._n_samples += 1
Expand Down
Loading

0 comments on commit 116f62a

Please sign in to comment.