Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of np.unique(return_index=True) #1138

Open
wants to merge 3 commits into
base: branch-24.03
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions cunumeric/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -4152,7 +4152,9 @@ def view(
writeable=self._writeable,
)

def unique(self) -> ndarray:
def unique(
self, return_index: bool = False
) -> Union[ndarray, tuple[ndarray, ndarray]]:
"""a.unique()

Find the unique elements of an array.
Expand All @@ -4168,8 +4170,21 @@ def unique(self) -> ndarray:
Multiple GPUs, Multiple CPUs

"""
thunk = self._thunk.unique()
return ndarray(shape=thunk.shape, thunk=thunk)
deferred_result = self._thunk.unique(return_index)
if return_index:
if TYPE_CHECKING:
deferred_result = cast(
tuple[NumPyThunk, NumPyThunk], deferred_result
)
return ndarray(
shape=deferred_result[0].shape, thunk=deferred_result[0]
), ndarray(
shape=deferred_result[1].shape, thunk=deferred_result[1]
)
else:
if TYPE_CHECKING:
deferred_result = cast(NumPyThunk, deferred_result)
return ndarray(shape=deferred_result.shape, thunk=deferred_result)

@classmethod
def _get_where_thunk(
Expand Down
2 changes: 2 additions & 0 deletions cunumeric/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ class _CunumericSharedLib:
CUNUMERIC_UNARY_RED: int
CUNUMERIC_UNIQUE: int
CUNUMERIC_UNIQUE_REDUCE: int
CUNUMERIC_UNZIP_INDICES: int
CUNUMERIC_UNLOAD_CUDALIBS: int
CUNUMERIC_UNPACKBITS: int
CUNUMERIC_UOP_ABSOLUTE: int
Expand Down Expand Up @@ -378,6 +379,7 @@ class CuNumericOpCode(IntEnum):
UNARY_RED = _cunumeric.CUNUMERIC_UNARY_RED
UNIQUE = _cunumeric.CUNUMERIC_UNIQUE
UNIQUE_REDUCE = _cunumeric.CUNUMERIC_UNIQUE_REDUCE
UNZIP = _cunumeric.CUNUMERIC_UNZIP_INDICES
UNLOAD_CUDALIBS = _cunumeric.CUNUMERIC_UNLOAD_CUDALIBS
UNPACKBITS = _cunumeric.CUNUMERIC_UNPACKBITS
WHERE = _cunumeric.CUNUMERIC_WHERE
Expand Down
69 changes: 60 additions & 9 deletions cunumeric/deferred.py
Original file line number Diff line number Diff line change
Expand Up @@ -3487,25 +3487,76 @@ def scan(
assert self.shape == swapped.shape
self.copy(swapped, deep=True)

def unique(self) -> NumPyThunk:
result = self.runtime.create_unbound_thunk(self.base.type)

def unique(
self, return_index: bool = False
) -> Union[NumPyThunk, tuple[NumPyThunk, Optional[NumPyThunk]]]:
task = self.context.create_auto_task(CuNumericOpCode.UNIQUE)

task.add_output(result.base)
task.add_input(self.base)
task.add_scalar_arg(return_index, ty.bool_)

result = None
# Assuming legate core will always choose GPU variant
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit]: Add a comment as to why the branches here are creating the thunk in different ways (i.e. different types)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra nit -- comments should be complete sentences, so punctuation and periods please.

# CPU uses legate.core Reduce op, which requires storing indices in struct
if self.runtime.num_gpus > 0:
task.add_nccl_communicator()
result = self.runtime.create_unbound_thunk(self.base.type)
elif return_index:
result = self.runtime.create_unbound_thunk(
ty.struct_type(
[
self.base.type,
ty.int64,
],
True,
)
)
else:
result = self.runtime.create_unbound_thunk(self.base.type)
task.add_output(result.base)

returned_indices = None
if return_index:
# GPU variant uses NCCL for reduction so can directly output indices
if self.runtime.num_gpus > 0:
returned_indices = self.runtime.create_unbound_thunk(ty.int64)
task.add_output(returned_indices.base)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, comment this asymmetry between the CPU and GPU implementations


for i in range(self.ndim):
task.add_scalar_arg(self.shape[i], ty.int32)

task.execute()

if self.runtime.num_gpus == 0 and self.runtime.num_procs > 1:
result.base = self.context.tree_reduce(
CuNumericOpCode.UNIQUE_REDUCE, result.base
)
if self.runtime.num_gpus == 0:
if self.runtime.num_procs > 1:
result.base = self.context.tree_reduce(
CuNumericOpCode.UNIQUE_REDUCE,
result.base,
scalar_args=[(return_index, ty.bool_)],
)
if return_index:
task = self.context.create_auto_task(CuNumericOpCode.UNZIP)
task.add_input(result.base)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an unclear way of writing the code, adding result.base as input, overwriting result, and then adding result.base as output.


return result
result = self.runtime.create_empty_thunk(
result.shape, self.base.type
)
returned_indices = self.runtime.create_empty_thunk(
result.shape, ty.int64
)

task.add_output(result.base)

returned_indices = cast(DeferredArray, returned_indices)
task.add_output(returned_indices.base)
task.add_alignment(result.base, returned_indices.base)

task.execute()

if return_index:
return result, returned_indices
else:
return result

@auto_convert("rhs", "v")
def searchsorted(self, rhs: Any, v: Any, side: SortSide = "left") -> None:
Expand Down
16 changes: 13 additions & 3 deletions cunumeric/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1713,11 +1713,21 @@ def scan(
else:
raise RuntimeError(f"unsupported scan op {op}")

def unique(self) -> NumPyThunk:
def unique(
self, return_index: bool = False
) -> Union[NumPyThunk, tuple[NumPyThunk, Optional[NumPyThunk]]]:
if self.deferred is not None:
return self.deferred.unique()
return self.deferred.unique(return_index=return_index)
else:
return EagerArray(self.runtime, np.unique(self.array))
if return_index:
np_values, np_indices = np.unique(
self.array, return_index=return_index
)
return EagerArray(self.runtime, np_values), EagerArray(
self.runtime, np_indices
)
else:
return EagerArray(self.runtime, np.unique(self.array))

def create_window(self, op_code: WindowOpCode, M: int, *args: Any) -> None:
if self.deferred is not None:
Expand Down
9 changes: 5 additions & 4 deletions cunumeric/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -6803,7 +6803,7 @@ def unique(
return_inverse: bool = False,
return_counts: bool = False,
axis: Optional[int] = None,
) -> ndarray:
) -> Union[ndarray, tuple[ndarray, ndarray]]:
"""

Find the unique elements of an array.
Expand Down Expand Up @@ -6868,12 +6868,13 @@ def unique(
`axis` is also not handled currently.

"""
if _builtin_any((return_index, return_inverse, return_counts, axis)):
if _builtin_any((return_inverse, return_counts, axis)):
raise NotImplementedError(
"Keyword arguments for `unique` are not yet supported"
"Keyword arguments for `unique` outside"
" of return_index are not yet supported"
)

return ar.unique()
return ar.unique(return_index)


##################################
Expand Down
4 changes: 3 additions & 1 deletion cunumeric/thunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,9 @@ def scan(
...

@abstractmethod
def unique(self) -> NumPyThunk:
def unique(
self, return_index: bool = False
) -> Union[NumPyThunk, tuple[NumPyThunk, Optional[NumPyThunk]]]:
...

@abstractmethod
Expand Down
2 changes: 2 additions & 0 deletions cunumeric_cpp.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ list(APPEND cunumeric_SOURCES
src/cunumeric/search/nonzero.cc
src/cunumeric/set/unique.cc
src/cunumeric/set/unique_reduce.cc
src/cunumeric/set/unzip_indices.cc
src/cunumeric/stat/bincount.cc
src/cunumeric/convolution/convolve.cc
src/cunumeric/transform/flip.cc
Expand Down Expand Up @@ -217,6 +218,7 @@ if(Legion_USE_OpenMP)
src/cunumeric/search/nonzero_omp.cc
src/cunumeric/set/unique_omp.cc
src/cunumeric/set/unique_reduce_omp.cc
src/cunumeric/set/unzip_indices_omp.cc
src/cunumeric/stat/bincount_omp.cc
src/cunumeric/convolution/convolve_omp.cc
src/cunumeric/transform/flip_omp.cc
Expand Down
1 change: 1 addition & 0 deletions src/cunumeric/cunumeric_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ enum CuNumericOpCode {
CUNUMERIC_UNARY_RED,
CUNUMERIC_UNIQUE,
CUNUMERIC_UNIQUE_REDUCE,
CUNUMERIC_UNZIP_INDICES,
CUNUMERIC_UNLOAD_CUDALIBS,
CUNUMERIC_UNPACKBITS,
CUNUMERIC_WHERE,
Expand Down
3 changes: 2 additions & 1 deletion src/cunumeric/mapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ std::vector<StoreMapping> CuNumericMapper::store_mappings(
}
case CUNUMERIC_MATMUL:
case CUNUMERIC_MATVECMUL:
case CUNUMERIC_UNIQUE_REDUCE: {
case CUNUMERIC_UNIQUE_REDUCE:
case CUNUMERIC_UNZIP_INDICES: {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on looking at the implementation of unzip_indices below, you don't need any special mapping for it and should fall to the default case at the bottom here.

// TODO: Our actual requirements are a little less strict than this; we require each array or
// vector to have a stride of 1 on at least one dimension.
std::vector<StoreMapping> mappings;
Expand Down
37 changes: 27 additions & 10 deletions src/cunumeric/set/unique.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,42 @@ template <Type::Code CODE, int32_t DIM>
struct UniqueImplBody<VariantKind::CPU, CODE, DIM> {
using VAL = legate_type_of<CODE>;

void operator()(Array& output,
void operator()(std::vector<Array>& outputs,
const AccessorRO<VAL, DIM>& in,
const Pitches<DIM - 1>& pitches,
const Rect<DIM>& rect,
const size_t volume,
const std::vector<comm::Communicator>& comms,
const DomainPoint& point,
const Domain& launch_domain)
const Domain& launch_domain,
const bool return_index,
const DomainPoint& parent_point)
{
std::set<VAL> dedup_set;
auto& output = outputs[0];
if (return_index) {
std::set<ZippedIndex<VAL>, IndexEquality<VAL>> dedup_set;
for (size_t idx = 0; idx < volume; ++idx) {
auto p = pitches.unflatten(idx, rect.lo);
auto value = in[p];
int64_t index = rowwise_linearize(DIM, p, parent_point);

for (size_t idx = 0; idx < volume; ++idx) {
auto p = pitches.unflatten(idx, rect.lo);
dedup_set.insert(in[p]);
}
dedup_set.insert(ZippedIndex<VAL>({value, index}));
}

auto result = output.create_output_buffer<ZippedIndex<VAL>, 1>(dedup_set.size(), true);
size_t pos = 0;
for (auto e : dedup_set) { result[pos++] = e; }
} else {
std::set<VAL> dedup_set;
for (size_t idx = 0; idx < volume; ++idx) {
auto p = pitches.unflatten(idx, rect.lo);
dedup_set.insert(in[p]);
}

auto result = output.create_output_buffer<VAL, 1>(dedup_set.size(), true);
size_t pos = 0;
for (auto e : dedup_set) result[pos++] = e;
auto result = output.create_output_buffer<VAL, 1>(dedup_set.size(), true);
size_t pos = 0;
for (auto e : dedup_set) result[pos++] = e;
}
}
};

Expand Down
Loading