Skip to content
This repository has been archived by the owner on Feb 17, 2023. It is now read-only.

ENH: Implemented max, min and sum operators #72

Merged
merged 1 commit into from
Aug 21, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions biggus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,102 @@ def process_data(self, data):
pass


class _MinStreamsHandler(_AggregationStreamsHandler):
def bootstrap(self, shape):
self.result = np.zeros(shape, dtype=self.array.dtype)

def finalise(self):
array = self.result
# Promote array-scalar to 0-dimensional array.
if array.ndim == 0:
array = np.array(array)
chunk = Chunk(self.current_keys, array)
return chunk

def process_data(self, data):
self.result = np.min(data, axis=self.axis)


class _MinMaskedStreamsHandler(_AggregationStreamsHandler):
def bootstrap(self, shape):
self.result = np.zeros(shape, dtype=self.array.dtype)

def finalise(self):
array = self.result
# Promote array-scalar to 0-dimensional array.
if array.ndim == 0:
array = np.ma.array(array)
chunk = Chunk(self.current_keys, array)
return chunk

def process_data(self, data):
self.result = np.min(data, axis=self.axis)


class _MaxStreamsHandler(_AggregationStreamsHandler):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the best way of pulling out commonality between these different aggregators without having mixed inheritance or multiple inheritance? Better to leave as is?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd quite like to refactor the aggregation stream handler stuff anyway, so rather than try to do two things in one PR I'm happy to let leave these as is. They can serve as an even more obvious reminder to refactor! 😉

def bootstrap(self, shape):
self.result = np.zeros(shape, dtype=self.array.dtype)

def finalise(self):
array = self.result
# Promote array-scalar to 0-dimensional array.
if array.ndim == 0:
array = np.array(array)
chunk = Chunk(self.current_keys, array)
return chunk

def process_data(self, data):
self.result = np.max(data, axis=self.axis)


class _MaxMaskedStreamsHandler(_AggregationStreamsHandler):
def bootstrap(self, shape):
self.result = np.zeros(shape, dtype=self.array.dtype)

def finalise(self):
array = self.result
# Promote array-scalar to 0-dimensional array.
if array.ndim == 0:
array = np.ma.array(array)
chunk = Chunk(self.current_keys, array)
return chunk

def process_data(self, data):
self.result = np.max(data, axis=self.axis)


class _SumStreamsHandler(_AggregationStreamsHandler):
def bootstrap(self, shape):
self.running_total = np.zeros(shape, dtype=self.array.dtype)

def finalise(self):
array = self.running_total
# Promote array-scalar to 0-dimensional array.
if array.ndim == 0:
array = np.array(array)
chunk = Chunk(self.current_keys, array)
return chunk

def process_data(self, data):
self.running_total += np.sum(data, axis=self.axis)


class _SumMaskedStreamsHandler(_AggregationStreamsHandler):
def bootstrap(self, shape):
self.running_total = np.ma.zeros(shape, dtype=self.array.dtype)

def finalise(self):
array = self.running_total
# Promote array-scalar to 0-dimensional array.
if array.ndim == 0:
array = np.ma.array(array)
chunk = Chunk(self.current_keys, array)
return chunk

def process_data(self, data):
self.running_total += np.sum(data, axis=self.axis)


class _MeanStreamsHandler(_AggregationStreamsHandler):
def __init__(self, array, axis, mdtol):
# The mdtol argument is not applicable to non-masked arrays
Expand Down Expand Up @@ -1587,6 +1683,93 @@ def _normalise_axis(axis, array):
return axes


def min(a, axis=None):
"""
Request the minimum of an Array over any number of axes.

.. note:: Currently limited to operating on a single axis.

Parameters
----------
a : Array object
The object whose minimum is to be found.
axis : None, or int, or iterable of ints
Axis or axes along which the operation is performed. The default
(axis=None) is to perform the operation over all the dimensions of the
input array. The axis may be negative, in which case it counts from
the last to the first axis. If axis is a tuple of ints, the operation
is performed over multiple axes.

Returns
-------
out : Array
The Array representing the requested mean.
"""
axes = _normalise_axis(axis, a)
assert axes is not None and len(axes) == 1
return _Aggregation(a, axes[0],
_MinStreamsHandler, _MinMaskedStreamsHandler,
a.dtype, {})


def max(a, axis=None):
"""
Request the maximum of an Array over any number of axes.

.. note:: Currently limited to operating on a single axis.

Parameters
----------
a : Array object
The object whose maximum is to be found.
axis : None, or int, or iterable of ints
Axis or axes along which the operation is performed. The default
(axis=None) is to perform the operation over all the dimensions of the
input array. The axis may be negative, in which case it counts from
the last to the first axis. If axis is a tuple of ints, the operation
is performed over multiple axes.

Returns
-------
out : Array
The Array representing the requested max.
"""
axes = _normalise_axis(axis, a)
assert axes is not None and len(axes) == 1
return _Aggregation(a, axes[0],
_MaxStreamsHandler, _MaxMaskedStreamsHandler,
a.dtype, {})


def sum(a, axis=None):
"""
Request the sum of an Array over any number of axes.

.. note:: Currently limited to operating on a single axis.

Parameters
----------
a : Array object
The object whose summation is to be found.
axis : None, or int, or iterable of ints
Axis or axes along which the operation is performed. The default
(axis=None) is to perform the operation over all the dimensions of the
input array. The axis may be negative, in which case it counts from
the last to the first axis. If axis is a tuple of ints, the operation
is performed over multiple axes.

Returns
-------
out : Array
The Array representing the requested sum.
"""
axes = _normalise_axis(axis, a)
assert axes is not None and len(axes) == 1
return _Aggregation(a, axes[0],
_SumStreamsHandler, _SumMaskedStreamsHandler,
a.dtype, {})


def mean(a, axis=None, mdtol=1):
"""
Request the mean of an Array over any number of axes.
Expand Down
160 changes: 160 additions & 0 deletions biggus/tests/unit/_aggregation_test_framework.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# (C) British Crown Copyright 2014, Met Office
#
# This file is part of Biggus.
#
# Biggus is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Biggus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Biggus. If not, see <http://www.gnu.org/licenses/>.
"""Unit tests for `biggus` aggregation operators."""
from abc import ABCMeta, abstractproperty

import numpy as np
import numpy.ma as ma

import biggus


class Operator(object):
__metaclass__ = ABCMeta

@abstractproperty
def biggus_operator(self):
pass

@abstractproperty
def numpy_operator(self):
pass

@abstractproperty
def numpy_masked_operator(self):
pass


class InvalidAxis(Operator):
def setUp(self):
self.array = biggus.NumpyArrayAdapter(np.arange(12))

def test_none(self):
with self.assertRaises(AssertionError):
self.biggus_operator(self.array)

def test_too_large(self):
with self.assertRaises(ValueError):
self.biggus_operator(self.array, axis=1)

def test_too_small(self):
with self.assertRaises(ValueError):
self.biggus_operator(self.array, axis=-2)

def test_multiple(self):
array = biggus.NumpyArrayAdapter(np.arange(12).reshape(3, 4))
with self.assertRaises(AssertionError):
self.biggus_operator(array, axis=(0, 1))


class AggregationDtype(Operator):
def _check(self, source):
# Default behaviour is for operators which inherrit their dtype from
# the objects they perform the aggregation.
array = biggus.NumpyArrayAdapter(np.arange(2, dtype=source))
agg = self.biggus_operator(array, axis=0)
self.assertEqual(agg.dtype, source)

def test_dtype_equal_source_dtype(self):
dtypes = [np.int8, np.int16, np.int32, np.int]
for dtype in dtypes:
self._check(dtype)


class NumpyArrayAdapter(Operator):
def setUp(self):
self.data = np.arange(12)

def _check(self, data, dtype=None, shape=None):
data = np.asarray(data, dtype=dtype)
if shape is not None:
data = data.reshape(shape)
array = biggus.NumpyArrayAdapter(data)
result = self.biggus_operator(array, axis=0).ndarray()
expected = self.numpy_operator(data, axis=0)
if expected.ndim == 0:
expected = np.asarray(expected)
np.testing.assert_array_equal(result, expected)

def test_flat_int(self):
self._check(self.data)

def test_multi_int(self):
self._check(self.data, shape=(3, 4))

def test_flat_float(self):
self._check(self.data, dtype=np.float)

def test_multi_float(self):
self._check(self.data, dtype=np.float, shape=(3, 4))


class NumpyArrayAdapterMasked():
def _check(self, data):
array = biggus.NumpyArrayAdapter(data)
result = self.biggus_operator(array, axis=0).masked_array()
expected = self.numpy_masked_operator(data, axis=0)
if expected.ndim == 0:
if expected is np.ma.masked:
expected = ma.asarray(expected, dtype=array.dtype)
else:
expected = ma.asarray(expected)
np.testing.assert_array_equal(result.filled(), expected.filled())
np.testing.assert_array_equal(result.mask, expected.mask)

def test_no_mask_flat(self):
for dtype in [np.int, np.float]:
data = ma.arange(12, dtype=dtype)
self._check(data)

def test_no_mask_multi(self):
for dtype in [np.int, np.float]:
data = ma.arange(12, dtype=dtype).reshape(3, 4)
self._check(data)

def test_flat(self):
for dtype in [np.int, np.float]:
data = ma.arange(12, dtype=dtype)
data[::2] = ma.masked
self._check(data)

data.mask = ma.nomask
data[1::2] = ma.masked
self._check(data)

def test_all_masked(self):
data = ma.arange(12, dtype=np.int)
data[:] = ma.masked
self._check(data)

def test_multi(self):
for dtype in [np.int, np.float]:
data = ma.arange(12, dtype=dtype)
data[::2] = ma.masked
self._check(data.reshape(3, 4))

data = ma.arange(12, dtype=dtype)
data[1::2] = ma.masked
self._check(data.reshape(3, 4))

data = ma.arange(12, dtype=dtype).reshape(3, 4)
data[::2] = ma.masked
self._check(data)

data = ma.arange(12, dtype=dtype).reshape(3, 4)
data[1::2] = ma.masked
self._check(data)
Loading