Skip to content

Commit

Permalink
Merge pull request #308 from jochen-ott-by/improve-performance-for-in…
Browse files Browse the repository at this point in the history
…dex-remove-partitions

Improve performance for index remove partitions
  • Loading branch information
marco-neumann-by authored Jul 14, 2020
2 parents 549288f + f9eb443 commit 2cf87df
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
6 changes: 6 additions & 0 deletions asv_bench/benchmarks/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class Index(IndexBase):
)
param_names = ["number_values", "number_partitions", "dtype"]

def time_remove_partitions_inplace(
self, number_values, number_partitions, arrow_type
):
partitions_to_remove = self.partition_values[len(self.partition_values) // 2 :]
self.ktk_index.remove_partitions(partitions_to_remove, inplace=True)

def time_load_index(self, number_values, number_partitions, arrow_type):
self.ktk_index_not_loaded.load(self.store)

Expand Down
23 changes: 11 additions & 12 deletions kartothek/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,18 +343,17 @@ def remove_partitions(
inplace: bool, (default: False)
If `True` the operation is performed inplace and will return the same object
"""
partitions_to_delete = list_of_partitions
if not partitions_to_delete:
if not list_of_partitions:
return self
partitions_to_delete = set(list_of_partitions)
if inplace:
values_to_remove = []
values_to_remove = set()
for val, partition_list in self.index_dct.items():
for partition_label in partitions_to_delete:
if partition_label in partition_list:
partition_list.remove(partition_label)
if len(partition_list) == 0:
values_to_remove.append(val)
break
new_partition_set = set(partition_list) - partitions_to_delete
if new_partition_set:
self.index_dct[val][:] = new_partition_set
else:
values_to_remove.add(val)
for val in values_to_remove:
del self.index_dct[val]
# Call the constructor again to reinit the creation timestamp
Expand All @@ -364,9 +363,9 @@ def remove_partitions(
else:
new_index_dict = {}
for val, partition_list in self.index_dct.items():
new_list = set(partition_list) - set(partitions_to_delete)
if len(new_list) > 0:
new_index_dict[val] = list(new_list)
new_partition_set = set(partition_list) - partitions_to_delete
if new_partition_set:
new_index_dict[val] = list(new_partition_set)
return self.copy(
column=self.column, index_dct=new_index_dict, dtype=self.dtype
)
Expand Down

0 comments on commit 2cf87df

Please sign in to comment.