From 51e10be6580ee2759d6bcf10bfe2e57cb2870a92 Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Mon, 20 Sep 2021 10:20:50 -0700 Subject: [PATCH] Merge PR#4 into main as a single commit Commands: pushd /tmp; git clone https://github.com/letsencrypt/mariadb-sequential-partition-manager-py.git; popd pushd /tmp/mariadb-sequential-partition-manager-py; git checkout -b pr-branch origin/pr-branch; popd git checkout main; git fetch origin; git reset --hard main cp -a /tmp/mariadb-sequential-partition-manager-py/* . git commit -a --- README.md | 79 +++- partitionmanager/bootstrap.py | 64 +-- partitionmanager/bootstrap_test.py | 1 - partitionmanager/cli.py | 203 +++++---- partitionmanager/cli_test.py | 54 ++- partitionmanager/sql.py | 48 +- partitionmanager/sql_test.py | 14 +- partitionmanager/stats.py | 43 +- partitionmanager/table_append_partition.py | 427 +++++++++++------- .../table_append_partition_test.py | 366 ++++++++++----- partitionmanager/tools.py | 16 +- partitionmanager/types.py | 383 +++++++++------- partitionmanager/types_test.py | 69 ++- 13 files changed, 1057 insertions(+), 710 deletions(-) diff --git a/README.md b/README.md index 8099b5a..a926d1a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ [![Build Status](https://circleci.com/gh/letsencrypt/mariadb-sequential-partition-manager-py.svg?style=shield)](https://circleci.com/gh/letsencrypt/mariadb-sequential-partition-manager-py) ![Maturity Level: Beta](https://img.shields.io/badge/maturity-beta-blue.svg) +# Partman + This tool partitions and manages MariaDB tables by sequential IDs. This is primarily a mechanism for dropping large numbers of rows of data without using `DELETE` statements. @@ -11,13 +13,48 @@ Similar tools: * https://github.com/davidburger/gomypartition, intended for tables with date-based partitions * https://github.com/yahoo/mysql_partition_manager, which is archived and in pure SQL -# Usage +## Usage + +```sh + → git clone https://github.com/letsencrypt/mariadb-sequential-partition-manager-py.git + → cd mariadb-sequential-partition-manager-py + → python3 -m venv .venv + → . .venv/bin/activate + → python3 -m pip install . + → tee /tmp/partman.conf.yml < data if any @@ -67,9 +60,7 @@ def __init__(self): self.statement = None def parse(self, data): - """ - Return rows from an XML Result object. - """ + """Return rows from an XML Result object.""" if self.rows is not None: raise ValueError("XmlResult objects can only be used once") @@ -77,7 +68,7 @@ def parse(self, data): self.xmlparser.Parse(data) if self.current_elements: - raise TruncatedDatabaseResultException( + raise partitionmanager.types.TruncatedDatabaseResultException( f"These XML tags are unclosed: {self.current_elements}" ) return self.rows @@ -112,7 +103,7 @@ def _end_element(self, name): assert self.current_field is not None value = self.current_row[self.current_field] if value: - self.current_row[self.current_field] = destring(value) + self.current_row[self.current_field] = _destring(value) self.current_field = None def _char_data(self, data): @@ -123,9 +114,9 @@ def _char_data(self, data): self.current_row[self.current_field] += data -class SubprocessDatabaseCommand(DatabaseCommand): - """ - Run a database command via the CLI tool, getting the results in XML form. +class SubprocessDatabaseCommand(partitionmanager.types.DatabaseCommand): + """Run a database command via the CLI tool, getting the results in XML form. + This can be very convenient without explicit port-forwarding, but is a little slow. """ @@ -147,15 +138,16 @@ def run(self, sql_cmd): def db_name(self): rows = self.run("SELECT DATABASE();") if len(rows) != 1: - raise TableInformationException("Expected one result") + raise partitionmanager.types.TableInformationException( + "Expected one result" + ) + return partitionmanager.types.SqlInput(rows[0]["DATABASE()"]) - return SqlInput(rows[0]["DATABASE()"]) +class IntegratedDatabaseCommand(partitionmanager.types.DatabaseCommand): + """Run a database command via a direct socket connection and pymysql. -class IntegratedDatabaseCommand(DatabaseCommand): - """ - Run a database command via a direct socket connection and pymysql, a pure - Python PEP 249-compliant database connector. + Pymysql is a pure Python PEP 249-compliant database connector. """ def __init__(self, url): @@ -175,7 +167,7 @@ def __init__(self, url): ) def db_name(self): - return SqlInput(self.db) + return partitionmanager.types.SqlInput(self.db) def run(self, sql_cmd): with self.connection.cursor() as cursor: diff --git a/partitionmanager/sql_test.py b/partitionmanager/sql_test.py index 3fee929..a58e9b3 100644 --- a/partitionmanager/sql_test.py +++ b/partitionmanager/sql_test.py @@ -1,16 +1,16 @@ import unittest -from .sql import destring, XmlResult +from .sql import _destring, XmlResult from .types import TruncatedDatabaseResultException class TestSubprocessParsing(unittest.TestCase): def test_destring(self): - self.assertEqual(destring("not a number"), "not a number") - self.assertEqual(destring("99999"), 99999) - self.assertEqual(destring("999.99"), 999.99) - self.assertEqual(destring("9.9999"), 9.9999) - self.assertEqual(destring("1/2"), "1/2") - self.assertEqual(destring("NULL"), "NULL") + self.assertEqual(_destring("not a number"), "not a number") + self.assertEqual(_destring("99999"), 99999) + self.assertEqual(_destring("999.99"), 999.99) + self.assertEqual(_destring("9.9999"), 9.9999) + self.assertEqual(_destring("1/2"), "1/2") + self.assertEqual(_destring("NULL"), "NULL") def test_single_row(self): o = XmlResult().parse( diff --git a/partitionmanager/stats.py b/partitionmanager/stats.py index a5222a0..0c480e5 100644 --- a/partitionmanager/stats.py +++ b/partitionmanager/stats.py @@ -5,14 +5,12 @@ import logging from datetime import timedelta -from .types import MaxValuePartition, Partition, UnexpectedPartitionException -from .tools import pairwise +import partitionmanager.tools +import partitionmanager.types class PrometheusMetric: - """ - Represents a single named metric for Prometheus - """ + """Represents a single named metric for Prometheus""" def __init__(self, name, table, data): self.name = name @@ -21,9 +19,7 @@ def __init__(self, name, table, data): class PrometheusMetrics: - """ - A set of metrics that can be rendered for Prometheus. - """ + """A set of metrics that can be rendered for Prometheus.""" def __init__(self): self.metrics = dict() @@ -31,24 +27,20 @@ def __init__(self): self.types = dict() def add(self, name, table, data): - """ - Record metric data representing the name and table. - """ + """Record metric data representing the name and table.""" if name not in self.metrics: self.metrics[name] = list() self.metrics[name].append(PrometheusMetric(name, table, data)) def describe(self, name, help_text=None, type_name=None): - """ - Add optional descriptive and type data for a given metric name. - """ + """Add optional descriptive and type data for a given metric name.""" self.help[name] = help_text self.types[name] = type_name def render(self, fp): - """ - Write the collected metrics to the supplied file-like object, following - the format specification: + """Write the collected metrics to the supplied file-like object. + + Follows the format specification: https://prometheus.io/docs/instrumenting/exposition_formats/ """ for n, metrics in self.metrics.items(): @@ -63,9 +55,7 @@ def render(self, fp): def get_statistics(partitions, current_timestamp, table): - """ - Return a dictionary of statistics about the supplied table's partitions. - """ + """Return a dictionary of statistics about the supplied table's partitions.""" log = logging.getLogger("get_statistics") results = {"partitions": len(partitions)} @@ -73,24 +63,24 @@ def get_statistics(partitions, current_timestamp, table): return results for p in partitions: - if not isinstance(p, Partition): + if not partitionmanager.types.is_partition_type(p): log.warning( f"{table} get_statistics called with a partition list " + f"that included a non-Partition entry: {p}" ) - raise UnexpectedPartitionException(p) + raise partitionmanager.types.UnexpectedPartitionException(p) head_part = None tail_part = partitions[-1] - if not isinstance(tail_part, MaxValuePartition): + if not isinstance(tail_part, partitionmanager.types.MaxValuePartition): log.warning( f"{table} get_statistics called with a partition list tail " + f"that wasn't a MaxValuePartition: {tail_part}" ) - raise UnexpectedPartitionException(tail_part) + raise partitionmanager.types.UnexpectedPartitionException(tail_part) - if tail_part.has_time and tail_part.timestamp(): + if tail_part.has_real_time and tail_part.timestamp(): results["time_since_newest_partition"] = ( current_timestamp - tail_part.timestamp() ) @@ -116,7 +106,7 @@ def get_statistics(partitions, current_timestamp, table): ) / (len(partitions) - 1) max_d = timedelta() - for a, b in pairwise(partitions): + for a, b in partitionmanager.tools.pairwise(partitions): if not a.timestamp() or not b.timestamp(): log.debug(f"{table} had partitions that aren't comparable: {a} and {b}") continue @@ -126,5 +116,4 @@ def get_statistics(partitions, current_timestamp, table): if max_d > timedelta(): results["max_partition_delta"] = max_d - return results diff --git a/partitionmanager/table_append_partition.py b/partitionmanager/table_append_partition.py index fae5c58..f2349be 100644 --- a/partitionmanager/table_append_partition.py +++ b/partitionmanager/table_append_partition.py @@ -7,77 +7,61 @@ import operator import re -from partitionmanager.types import ( - ChangePlannedPartition, - DuplicatePartitionException, - InstantPartition, - MaxValuePartition, - MismatchedIdException, - NewPlannedPartition, - NoEmptyPartitionsAvailableException, - Partition, - PlannedPartition, - PositionPartition, - SqlInput, - Table, - TableInformationException, - UnexpectedPartitionException, -) -from .tools import pairwise, iter_show_end - - -def table_is_compatible(database, table): - """ - Gather the information schema from the database command and parse out the - autoincrement value. - """ +import partitionmanager.types +import partitionmanager.tools + + +def get_table_compatibility_problems(database, table): + """Return a list of strings of problems altering this table, or empty.""" db_name = database.db_name() if ( - not isinstance(db_name, SqlInput) - or not isinstance(table, Table) - or not isinstance(table.name, SqlInput) + not isinstance(db_name, partitionmanager.types.SqlInput) + or not isinstance(table, partitionmanager.types.Table) + or not isinstance(table.name, partitionmanager.types.SqlInput) ): - return f"Unexpected table type: {table}" + return [f"Unexpected table type: {table}"] + sql_cmd = ( "SELECT CREATE_OPTIONS FROM INFORMATION_SCHEMA.TABLES " + f"WHERE TABLE_SCHEMA='{db_name}' and TABLE_NAME='{table.name}';" ).strip() - - return table_information_schema_is_compatible(database.run(sql_cmd), table.name) + return _get_table_information_schema_problems(database.run(sql_cmd), table.name) -def table_information_schema_is_compatible(rows, table_name): - """ - Parse a table information schema, validating options including existence of - each table - """ +def _get_table_information_schema_problems(rows, table_name): + """Return a string representing problems partitioning this table, or None.""" if len(rows) != 1: - return f"Unable to read information for {table_name}" + return [f"Unable to read information for {table_name}"] options = rows[0] if "partitioned" not in options["CREATE_OPTIONS"]: - return f"Table {table_name} is not partitioned" - - return None + return [f"Table {table_name} is not partitioned"] + return list() def get_current_positions(database, table, columns): + """Get positions of the columns in the table. + + Return as a dictionary of {column_name: position} """ - Get the positions of the columns provided in the given table, return - as a dictionary of {column_name: position} - """ - if not isinstance(columns, list) or not isinstance(table, Table): + if not isinstance(columns, list) or not isinstance( + table, partitionmanager.types.Table + ): raise ValueError("columns must be a list and table must be a Table") positions = dict() for column in columns: + if not isinstance(column, str): + raise ValueError("columns must be a list of strings") sql = f"SELECT {column} FROM `{table.name}` ORDER BY {column} DESC LIMIT 1;" rows = database.run(sql) if len(rows) > 1: - raise TableInformationException(f"Expected one result from {table.name}") + raise partitionmanager.types.TableInformationException( + f"Expected one result from {table.name}" + ) if not rows: - raise TableInformationException( + raise partitionmanager.types.TableInformationException( f"Table {table.name} appears to be empty. (No results)" ) positions[column] = rows[0][column] @@ -85,19 +69,23 @@ def get_current_positions(database, table, columns): def get_partition_map(database, table): - """ - Gather the partition map via the database command tool. - """ - if not isinstance(table, Table) or not isinstance(table.name, SqlInput): + """Gather the partition map via the database command tool.""" + if not isinstance(table, partitionmanager.types.Table) or not isinstance( + table.name, partitionmanager.types.SqlInput + ): raise ValueError("Unexpected type") sql_cmd = f"SHOW CREATE TABLE `{table.name}`;" - return parse_partition_map(database.run(sql_cmd)) + return _parse_partition_map(database.run(sql_cmd)) -def parse_partition_map(rows): - """ - Read a partition statement from a table creation string and produce Partition - objets for each partition. +def _parse_partition_map(rows): + """Return a dictionary of range_cols and partition objects. + + The "range_cols" is the ordered list of what columns are used as the + range identifiers for the partitions. + + The "partitions" is a list of the Partition objects representing each + defined partition. There will be at least one partitionmanager.types.MaxValuePartition. """ log = logging.getLogger("parse_partition_map") @@ -115,7 +103,7 @@ def parse_partition_map(rows): partitions = list() if len(rows) != 1: - raise TableInformationException("Expected one result") + raise partitionmanager.types.TableInformationException("Expected one result") options = rows[0] @@ -134,7 +122,7 @@ def parse_partition_map(rows): part_vals = [int(x.strip("` ")) for x in part_vals_str.split(",")] if range_cols is None: - raise TableInformationException( + raise partitionmanager.types.TableInformationException( "Processing partitions, but the partition definition wasn't found." ) @@ -142,88 +130,108 @@ def parse_partition_map(rows): log.error( f"Partition columns {part_vals} don't match the partition range {range_cols}" ) - raise MismatchedIdException("Partition columns mismatch") + raise partitionmanager.types.MismatchedIdException( + "Partition columns mismatch" + ) - pos_part = PositionPartition(part_name).set_position(part_vals) + pos_part = partitionmanager.types.PositionPartition(part_name).set_position( + part_vals + ) partitions.append(pos_part) member_tail = partition_tail.match(l) if member_tail: if range_cols is None: - raise TableInformationException( + raise partitionmanager.types.TableInformationException( "Processing tail, but the partition definition wasn't found." ) part_name = member_tail.group("name") log.debug(f"Found tail partition named {part_name}") - partitions.append(MaxValuePartition(part_name, len(range_cols))) - - if not partitions or not isinstance(partitions[-1], MaxValuePartition): - raise UnexpectedPartitionException("There was no tail partition") + partitions.append( + partitionmanager.types.MaxValuePartition(part_name, len(range_cols)) + ) + if not partitions or not isinstance( + partitions[-1], partitionmanager.types.MaxValuePartition + ): + raise partitionmanager.types.UnexpectedPartitionException( + "There was no tail partition" + ) return {"range_cols": range_cols, "partitions": partitions} -def split_partitions_around_positions(partition_list, current_positions): - """ - Split a partition_list into those for which _all_ values are less than - current_positions, a single partition whose values contain current_positions, - and a list of all the others. +def _split_partitions_around_position(partition_list, current_position): + """Divide up a partition list to three parts: filled, current, and empty. + + The first part is the filled partition list: those partitions for which + _all_ values are less than current_position. + + The second is the a single partition whose values contain current_position. + + The third part is a list of all the other, empty partitions yet-to-be-filled. """ for p in partition_list: - if not isinstance(p, Partition): - raise UnexpectedPartitionException(p) - if not isinstance(current_positions, list): + if not partitionmanager.types.is_partition_type(p): + raise partitionmanager.types.UnexpectedPartitionException(p) + if not isinstance(current_position, partitionmanager.types.Position): raise ValueError() less_than_partitions = list() greater_or_equal_partitions = list() for p in partition_list: - if p < current_positions: + if p < current_position: less_than_partitions.append(p) else: greater_or_equal_partitions.append(p) # The active partition is always the first in the list of greater_or_equal active_partition = greater_or_equal_partitions.pop(0) - return less_than_partitions, active_partition, greater_or_equal_partitions -def get_position_increase_per_day(p1, p2): - """ - Return a list containing the change in positions between p1 and p2 divided +def _get_position_increase_per_day(p1, p2): + """Return the rate of change between two position-lists, in positions/day. + + Returns a list containing the change in positions between p1 and p2 divided by the number of days between them, as "position increase per day", or raise ValueError if p1 is not before p2, or if either p1 or p2 does not have a position. For partitions with only a single position, this will be a list of size 1. """ - if not isinstance(p1, PositionPartition) or not isinstance(p2, PositionPartition): - raise ValueError("Both partitions must be PositionPartition type") + if not isinstance(p1, partitionmanager.types.PositionPartition) or not isinstance( + p2, partitionmanager.types.PositionPartition + ): + raise ValueError( + "Both partitions must be partitionmanager.types.PositionPartition type" + ) if None in (p1.timestamp(), p2.timestamp()): # An empty list skips this pair in get_weighted_position_increase return list() if p1.timestamp() >= p2.timestamp(): - raise ValueError(f"p1 {p1} must be before p2 {p2}") + raise ValueError(f"p1 {p1} must have a timestamp before p2 {p2}") if p1.num_columns != p2.num_columns: raise ValueError(f"p1 {p1} and p2 {p2} must have the same number of columns") delta_time = p2.timestamp() - p1.timestamp() delta_days = delta_time / timedelta(days=1) - delta_positions = list(map(operator.sub, p2.positions, p1.positions)) + delta_positions = list( + map(operator.sub, p2.position.as_list(), p1.position.as_list()) + ) return list(map(lambda pos: pos / delta_days, delta_positions)) -def generate_weights(count): - """ - Generate a static list of geometricly-decreasing values, starting from - 10,000 to give a high ceiling. It could be dynamic, but eh. +def _generate_weights(count): + """Static list of geometrically-decreasing weights. + + Starts from 10,000 to give a high ceiling. It could be dynamic, but eh. """ return [10_000 / x for x in range(count, 0, -1)] -def get_weighted_position_increase_per_day_for_partitions(partitions): - """ - For the provided list of partitions, uses the get_position_increase_per_day +def _get_weighted_position_increase_per_day_for_partitions(partitions): + """Get weighted partition-position-increase-per-day as a position-list. + + For the provided list of partitions, uses the _get_position_increase_per_day method to generate a list position increment rates in positions/day, then uses a geometric weight to make more recent rates influence the outcome more, and returns a final list of weighted partition-position-increase-per- @@ -233,9 +241,10 @@ def get_weighted_position_increase_per_day_for_partitions(partitions): raise ValueError("Partition list must not be empty") pos_rates = [ - get_position_increase_per_day(p1, p2) for p1, p2 in pairwise(partitions) + _get_position_increase_per_day(p1, p2) + for p1, p2 in partitionmanager.tools.pairwise(partitions) ] - weights = generate_weights(len(pos_rates)) + weights = _generate_weights(len(pos_rates)) # Initialize a list with a zero for each position weighted_sums = [0] * partitions[0].num_columns @@ -243,14 +252,14 @@ def get_weighted_position_increase_per_day_for_partitions(partitions): for p_r, weight in zip(pos_rates, weights): for idx, val in enumerate(p_r): weighted_sums[idx] += val * weight - return list(map(lambda x: x / sum(weights), weighted_sums)) -def predict_forward_position(current_positions, rate_of_change, duration): - """ - Move current_positions forward a given duration at the provided rates of - change. The rate and the duration must be compatible units, and both the +def _predict_forward_position(current_positions, rate_of_change, duration): + """Return a predicted future position as a position-list. + + This moves current_positions forward a given duration at the provided rates + of change. The rate and the duration must be compatible units, and both the positions and the rate must be lists of the same size. """ if len(current_positions) != len(rate_of_change): @@ -261,91 +270,122 @@ def predict_forward_position(current_positions, rate_of_change, duration): f"Can't predict forward with a negative rate of change: {neg_rate}" ) - increase = list(map(lambda x: x * duration / timedelta(days=1), rate_of_change)) + increase = list(map(lambda x: x * (duration / timedelta(days=1)), rate_of_change)) predicted_positions = [int(p + i) for p, i in zip(current_positions, increase)] for old, new in zip(current_positions, predicted_positions): assert new >= old, f"Always predict forward, {new} < {old}" return predicted_positions -def predict_forward_time(current_positions, end_positions, rates, evaluation_time): - """ - Given the current_positions and the rates, determine the timestamp of when - the positions will reach ALL end_positions. +def _predict_forward_time(current_position, end_position, rates, evaluation_time): + """Return a predicted datetime of when we'll exceed the end position-list. + + Given the current_position position-list and the rates, this calculates + a timestamp of when the positions will be beyond ALL of the end_positions + position-list, as that is MariaDB's definition of when to start filling a + partition. """ - if not len(current_positions) == len(end_positions) == len(rates): + if not isinstance( + current_position, partitionmanager.types.Position + ) or not isinstance(end_position, partitionmanager.types.Position): + raise ValueError("Expected to be given Position types") + + if not len(current_position) == len(end_position) == len(rates): raise ValueError("Expected identical list sizes") - for neg_rate in filter(lambda r: r < 0, rates): + for neg_rate in filter(lambda r: r <= 0, rates): raise ValueError( - f"Can't predict forward with a negative rate of change: {neg_rate}" + f"Can't predict forward with a non-positive rate of change: " + f"{neg_rate} / {rates}" ) days_remaining = [ (end - now) / rate - for now, end, rate in zip(current_positions, end_positions, rates) + for now, end, rate in zip( + current_position.as_list(), end_position.as_list(), rates + ) ] if max(days_remaining) < 0: raise ValueError(f"All values are negative: {days_remaining}") - return evaluation_time + (max(days_remaining) * timedelta(days=1)) -def calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan): - """ - Partition start times should never be in the past. +def _calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan): + """Return a start time to be used in the partition planning. + + This is a helper method that doesn't always return strictly + last_changed_time + allowed_lifespan, it prohibits times in the past, + returning evaluation_time instead, to ensure that we don't try to set + newly constructed partitions in the past. """ partition_start_time = last_changed_time + allowed_lifespan if partition_start_time < evaluation_time: + # Partition start times should never be in the past. return evaluation_time return partition_start_time -def plan_partition_changes( +def _plan_partition_changes( partition_list, - current_positions, + current_position, evaluation_time, allowed_lifespan, num_empty_partitions, ): - """ - Produces a list of partitions that should be modified or created in order - to meet the supplied table requirements, using an estimate as to the rate of - fill. + """Return a list of partitions to modify or create. + + This method makes recommendations in order to meet the supplied table + requirements, using an estimate as to the rate of fill from the supplied + partition_list, current_position, and evaluation_time. """ log = logging.getLogger("plan_partition_changes") - filled_partitions, active_partition, empty_partitions = split_partitions_around_positions( - partition_list, current_positions + filled_partitions, active_partition, empty_partitions = _split_partitions_around_position( + partition_list, current_position ) if not empty_partitions: - log.warning( + log.error( f"Partition {active_partition.name} requires manual ALTER " - "as this tool won't bisect the partition to determine a" - "rate of fill to make a prediction for new partitions." + "as without an empty partition to manipulate, you'll need to " + "perform an expensive copy operation. See the bootstrap mode." ) - raise NoEmptyPartitionsAvailableException() + raise partitionmanager.types.NoEmptyPartitionsAvailableException() if not active_partition: raise Exception("Active Partition can't be None") - if active_partition.timestamp() >= evaluation_time: - raise ValueError( - f"Evaluation time ({evaluation_time}) must be after " - f"the active partition {active_partition}." + rate_relevant_partitions = None + + if active_partition.timestamp() < evaluation_time: + # This bit of weirdness is a fencepost issue: The partition list is strictly + # increasing until we get to "now" and the active partition. "Now" actually + # takes place _after_ active partition's start date (naturally), but + # contains a position that is before the top of active, by definition. For + # the rate processing to work, we need to swap the "now" and the active + # partition's dates and positions. + rate_relevant_partitions = filled_partitions + [ + partitionmanager.types.InstantPartition( + active_partition.timestamp(), current_position + ), + partitionmanager.types.InstantPartition( + evaluation_time, active_partition.position + ), + ] + else: + # If the active partition's start date is later than today, then we + # previously mispredicted the rate of change. There's nothing we can + # do about that at this point, except limit our rate-of-change calculation + # to exclude the future-dated, irrelevant partition. + log.debug( + f"Misprediction: Evaluation time ({evaluation_time}) is " + f"before the active partition {active_partition}. Excluding from " + "rate calculations." ) + rate_relevant_partitions = filled_partitions + [ + partitionmanager.types.InstantPartition(evaluation_time, current_position) + ] - # This bit of weirdness is a fencepost issue: The partition list is strictly - # increasing until we get to "now" and the active partition. "Now" actually - # takes place _after_ active partition's start date (naturally), but - # contains a position that is before the top of active, by definition. For - # the rate processing to work, we need to cross the "now" and the active - # partition's dates and positions. - rate_relevant_partitions = filled_partitions + [ - InstantPartition(active_partition.timestamp(), current_positions), - InstantPartition(evaluation_time, active_partition.positions), - ] - rates = get_weighted_position_increase_per_day_for_partitions( + rates = _get_weighted_position_increase_per_day_for_partitions( rate_relevant_partitions ) log.debug( @@ -355,22 +395,22 @@ def plan_partition_changes( # We need to include active_partition in the list for the subsequent # calculations even though we're not actually changing it. - results = [ChangePlannedPartition(active_partition)] + results = [partitionmanager.types.ChangePlannedPartition(active_partition)] # Adjust each of the empty partitions for partition in empty_partitions: last_changed = results[-1] - changed_partition = ChangePlannedPartition(partition) + changed_partition = partitionmanager.types.ChangePlannedPartition(partition) - if isinstance(partition, PositionPartition): + if isinstance(partition, partitionmanager.types.PositionPartition): # We can't change the position on this partition, but we can adjust # the name to be more exact as to what date we expect it to begin # filling. If we calculate the start-of-fill date and it doesn't # match the partition's name, let's rename it and mark it as an # important change. - start_of_fill_time = predict_forward_time( - current_positions, last_changed.positions, rates, evaluation_time + start_of_fill_time = _predict_forward_time( + current_position, last_changed.position, rates, evaluation_time ) if start_of_fill_time.date() != partition.timestamp().date(): @@ -382,16 +422,16 @@ def plan_partition_changes( ) changed_partition.set_timestamp(start_of_fill_time).set_important() - if isinstance(partition, MaxValuePartition): + if isinstance(partition, partitionmanager.types.MaxValuePartition): # Only the tail MaxValuePartitions can get new positions. For those, # we calculate forward what position we expect and use it in the # future. - partition_start_time = calculate_start_time( + partition_start_time = _calculate_start_time( last_changed.timestamp(), evaluation_time, allowed_lifespan ) - changed_part_pos = predict_forward_position( - last_changed.positions, rates, allowed_lifespan + changed_part_pos = _predict_forward_position( + last_changed.position.as_list(), rates, allowed_lifespan ) changed_partition.set_position(changed_part_pos).set_timestamp( partition_start_time @@ -402,15 +442,15 @@ def plan_partition_changes( # Ensure we have the required number of empty partitions while len(results) < num_empty_partitions + 1: last_changed = results[-1] - partition_start_time = calculate_start_time( + partition_start_time = _calculate_start_time( last_changed.timestamp(), evaluation_time, allowed_lifespan ) - new_part_pos = predict_forward_position( - last_changed.positions, rates, allowed_lifespan + new_part_pos = _predict_forward_position( + last_changed.position.as_list(), rates, allowed_lifespan ) results.append( - NewPlannedPartition() + partitionmanager.types.NewPlannedPartition() .set_position(new_part_pos) .set_timestamp(partition_start_time) ) @@ -419,36 +459,38 @@ def plan_partition_changes( results[-1].set_as_max_value() log.debug(f"Planned {results}") - return results -def evaluate_partition_changes(altered_partitions): - """ +def _should_run_changes(altered_partitions): + """Returns True if the changeset should run, otherwise returns False. + Evaluate the list from plan_partition_changes and determine if the set of changes should be performed - if all the changes are minor, they shouldn't - be run. Returns True if the changeset should run, otherwise logs the reason - for skipping and returns False + be run. """ - log = logging.getLogger("evaluate_partition_changes") + log = logging.getLogger("should_run_changes") for p in altered_partitions: - if isinstance(p, NewPlannedPartition): + if isinstance(p, partitionmanager.types.NewPlannedPartition): log.debug(f"{p} is new") return True - if isinstance(p, ChangePlannedPartition): + if isinstance(p, partitionmanager.types.ChangePlannedPartition): if p.important(): log.debug(f"{p} is marked important") return True - return False def generate_sql_reorganize_partition_commands(table, changes): - """ - Generate a series of SQL commands to reorganize the partition in table_name - to match the new changes list. + """Generates SQL commands to reorganize table to apply the changes. + + Args: + + table: a types.Table object + + changes: a list of objects implenting types.PlannedPartition """ log = logging.getLogger(f"generate_sql_reorganize_partition_commands:{table.name}") @@ -456,12 +498,13 @@ def generate_sql_reorganize_partition_commands(table, changes): new_partitions = list() for p in changes: - if not isinstance(p, PlannedPartition): - raise UnexpectedPartitionException(p) - if isinstance(p, NewPlannedPartition): + if isinstance(p, partitionmanager.types.ChangePlannedPartition): + assert not new_partitions, "Modified partitions must precede new partitions" + modified_partitions.append(p) + elif isinstance(p, partitionmanager.types.NewPlannedPartition): new_partitions.append(p) else: - modified_partitions.append(p) + raise partitionmanager.types.UnexpectedPartitionException(p) # If there's not at least one modification, bail out if not new_partitions and not list( @@ -470,11 +513,10 @@ def generate_sql_reorganize_partition_commands(table, changes): log.debug("No partitions have modifications and no new partitions") return - new_part_list = list() partition_names_set = set() for modified_partition, is_final in reversed( - list(iter_show_end(modified_partitions)) + list(partitionmanager.tools.iter_show_end(modified_partitions)) ): # We reverse the iterator so that we always alter the furthest-out partitions # first, so that we are always increasing the number of empty partitions @@ -491,7 +533,9 @@ def generate_sql_reorganize_partition_commands(table, changes): partition_strings = list() for part in new_part_list: if part.name in partition_names_set: - raise DuplicatePartitionException(f"Duplicate {part}") + raise partitionmanager.types.DuplicatePartitionException( + f"Duplicate {part}" + ) partition_names_set.add(part.name) partition_strings.append( @@ -507,3 +551,54 @@ def generate_sql_reorganize_partition_commands(table, changes): log.debug(f"Yielding {alter_cmd}") yield alter_cmd + + +def get_pending_sql_reorganize_partition_commands( + *, + table, + partition_list, + current_position, + allowed_lifespan, + num_empty_partitions, + evaluation_time, +): + """Return a list of SQL commands to produce an optimally-partitioned table. + + This algorithm is described in the README.md file as the Maintain Algorithm. + + Args: + + table: The table name and properties + + partition_list: the currently-existing partition objects, each with + a name and either a starting position or are the tail MAXVALUE. + + current_position: a Position representing the position IDs for + this table at the evaluation_time. + + allowed_lifespan: a timedelta that represents how long a span of time + a partition should seek to cover. + + num_empty_partitions: the number of empty partitions to seek to keep at the + tail, each aiming to span allowed_lifespan. + + evaluation_time: a datetime instance that represents the time the + algorithm is running. + """ + + log = logging.getLogger("get_pending_sql_reorganize_partition_commands") + + partition_changes = _plan_partition_changes( + partition_list, + current_position, + evaluation_time, + allowed_lifespan, + num_empty_partitions, + ) + + if not _should_run_changes(partition_changes): + log.info(f"{table} does not need to be modified currently.") + return list() + + log.debug(f"{table} has changes waiting.") + return generate_sql_reorganize_partition_commands(table, partition_changes) diff --git a/partitionmanager/table_append_partition_test.py b/partitionmanager/table_append_partition_test.py index 6dafe8f..822eefd 100644 --- a/partitionmanager/table_append_partition_test.py +++ b/partitionmanager/table_append_partition_test.py @@ -11,7 +11,6 @@ MismatchedIdException, NewPlannedPartition, NoEmptyPartitionsAvailableException, - Partition, PositionPartition, SqlInput, Table, @@ -19,23 +18,24 @@ UnexpectedPartitionException, ) from partitionmanager.table_append_partition import ( - evaluate_partition_changes, + _generate_weights, + _get_position_increase_per_day, + _get_table_information_schema_problems, + _get_weighted_position_increase_per_day_for_partitions, + _parse_partition_map, + _plan_partition_changes, + _predict_forward_position, + _predict_forward_time, + _should_run_changes, + _split_partitions_around_position, generate_sql_reorganize_partition_commands, - generate_weights, get_current_positions, get_partition_map, - get_position_increase_per_day, - get_weighted_position_increase_per_day_for_partitions, - parse_partition_map, - plan_partition_changes, - predict_forward_position, - predict_forward_time, - split_partitions_around_positions, - table_information_schema_is_compatible, - table_is_compatible, + get_pending_sql_reorganize_partition_commands, + get_table_compatibility_problems, ) -from .types_test import mkPPart, mkTailPart +from .types_test import mkPPart, mkTailPart, mkPos class MockDatabase(DatabaseCommand): @@ -58,26 +58,33 @@ def test_get_partition_map(self): def test_get_autoincrement(self): self.assertEqual( - table_is_compatible(MockDatabase(), ""), "Unexpected table type: " + get_table_compatibility_problems(MockDatabase(), ""), + ["Unexpected table type: "], ) class TestParseTableInformationSchema(unittest.TestCase): def test_not_partitioned_and_unexpected(self): info = [{"CREATE_OPTIONS": "exfoliated, disenchanted"}] - self.assertIsNotNone(table_information_schema_is_compatible(info, "extable")) + self.assertEqual( + _get_table_information_schema_problems(info, "extable"), + ["Table extable is not partitioned"], + ) def test_not_partitioned(self): info = [{"CREATE_OPTIONS": "exfoliated"}] - self.assertIsNotNone(table_information_schema_is_compatible(info, "extable")) + self.assertEqual( + _get_table_information_schema_problems(info, "extable"), + ["Table extable is not partitioned"], + ) def test_normal(self): info = [{"CREATE_OPTIONS": "partitioned"}] - self.assertIsNone(table_information_schema_is_compatible(info, "table")) + self.assertEqual(_get_table_information_schema_problems(info, "table"), list()) def test_normal_multiple_create_options(self): info = [{"CREATE_OPTIONS": "magical, partitioned"}] - self.assertIsNone(table_information_schema_is_compatible(info, "table")) + self.assertEqual(_get_table_information_schema_problems(info, "table"), list()) class TestParsePartitionMap(unittest.TestCase): @@ -94,7 +101,7 @@ def test_single_partition(self): """, } ] - results = parse_partition_map(create_stmt) + results = _parse_partition_map(create_stmt) self.assertEqual(len(results["partitions"]), 1) self.assertEqual(results["partitions"][0], mkTailPart("p_20201204")) self.assertEqual(results["range_cols"], ["id"]) @@ -113,7 +120,7 @@ def test_two_partitions(self): """, } ] - results = parse_partition_map(create_stmt) + results = _parse_partition_map(create_stmt) self.assertEqual(len(results["partitions"]), 2) self.assertEqual(results["partitions"][0], mkPPart("before", 100)) self.assertEqual(results["partitions"][1], mkTailPart("p_20201204")) @@ -132,7 +139,7 @@ def test_dual_keys_single_partition(self): (PARTITION `p_start` VALUES LESS THAN (MAXVALUE, MAXVALUE) ENGINE = InnoDB)""", } ] - results = parse_partition_map(create_stmt) + results = _parse_partition_map(create_stmt) self.assertEqual(len(results["partitions"]), 1) self.assertEqual(results["partitions"][0], mkTailPart("p_start", count=2)) self.assertEqual(results["range_cols"], ["firstID", "secondID"]) @@ -151,7 +158,7 @@ def test_dual_keys_multiple_partitions(self): PARTITION `p_next` VALUES LESS THAN (MAXVALUE, MAXVALUE) ENGINE = InnoDB)""", } ] - results = parse_partition_map(create_stmt) + results = _parse_partition_map(create_stmt) self.assertEqual(len(results["partitions"]), 2) self.assertEqual(results["partitions"][0], mkPPart("p_start", 255, 1234567890)) self.assertEqual(results["partitions"][1], mkTailPart("p_next", count=2)) @@ -171,7 +178,7 @@ def test_missing_part_definition(self): } ] with self.assertRaises(TableInformationException): - parse_partition_map(create_stmt) + _parse_partition_map(create_stmt) def test_missing_part_definition_and_just_tail(self): create_stmt = [ @@ -186,7 +193,7 @@ def test_missing_part_definition_and_just_tail(self): } ] with self.assertRaises(TableInformationException): - parse_partition_map(create_stmt) + _parse_partition_map(create_stmt) def test_missing_part_tail(self): create_stmt = [ @@ -201,7 +208,7 @@ def test_missing_part_tail(self): } ] with self.assertRaises(UnexpectedPartitionException): - parse_partition_map(create_stmt) + _parse_partition_map(create_stmt) class TestSqlInput(unittest.TestCase): @@ -249,57 +256,57 @@ def test_get_position_two_columns(self): class TestPartitionAlgorithm(unittest.TestCase): def test_split(self): with self.assertRaises(UnexpectedPartitionException): - split_partitions_around_positions( - [mkPPart("a", 1), mkTailPart("z")], [10, 10] + _split_partitions_around_position( + [mkPPart("a", 1), mkTailPart("z")], mkPos(10, 10) ) with self.assertRaises(UnexpectedPartitionException): - split_partitions_around_positions( - [mkPPart("a", 1, 1), mkTailPart("z")], [10, 10] + _split_partitions_around_position( + [mkPPart("a", 1, 1), mkTailPart("z")], mkPos(10, 10) ) with self.assertRaises(UnexpectedPartitionException): - split_partitions_around_positions( - [mkPPart("a", 1), mkTailPart("z", count=2)], [10, 10] + _split_partitions_around_position( + [mkPPart("a", 1), mkTailPart("z", count=2)], mkPos(10, 10) ) self.assertEqual( - split_partitions_around_positions( - [mkPPart("a", 1), mkPPart("b", 2), mkTailPart("z")], [10] + _split_partitions_around_position( + [mkPPart("a", 1), mkPPart("b", 2), mkTailPart("z")], mkPos(10) ), ([mkPPart("a", 1), mkPPart("b", 2)], mkTailPart("z"), []), ) self.assertEqual( - split_partitions_around_positions( - [mkPPart("a", 100), mkPPart("b", 200), mkTailPart("z")], [10] + _split_partitions_around_position( + [mkPPart("a", 100), mkPPart("b", 200), mkTailPart("z")], mkPos(10) ), ([], mkPPart("a", 100), [mkPPart("b", 200), mkTailPart("z")]), ) self.assertEqual( - split_partitions_around_positions( - [mkPPart("a", 1), mkPPart("b", 10), mkTailPart("z")], [10] + _split_partitions_around_position( + [mkPPart("a", 1), mkPPart("b", 10), mkTailPart("z")], mkPos(10) ), ([mkPPart("a", 1)], mkPPart("b", 10), [mkTailPart("z")]), ) self.assertEqual( - split_partitions_around_positions( - [mkPPart("a", 1), mkPPart("b", 11), mkTailPart("z")], [10] + _split_partitions_around_position( + [mkPPart("a", 1), mkPPart("b", 11), mkTailPart("z")], mkPos(10) ), ([mkPPart("a", 1)], mkPPart("b", 11), [mkTailPart("z")]), ) self.assertEqual( - split_partitions_around_positions( + _split_partitions_around_position( [mkPPart("a", 1), mkPPart("b", 11), mkPPart("c", 11), mkTailPart("z")], - [10], + mkPos(10), ), ([mkPPart("a", 1)], mkPPart("b", 11), [mkPPart("c", 11), mkTailPart("z")]), ) self.assertEqual( - split_partitions_around_positions( + _split_partitions_around_position( [mkPPart("a", 1), mkPPart("b", 11), mkPPart("c", 11), mkTailPart("z")], - [0], + mkPos(0), ), ( [], @@ -309,9 +316,9 @@ def test_split(self): ) self.assertEqual( - split_partitions_around_positions( + _split_partitions_around_position( [mkPPart("a", 1), mkPPart("b", 11), mkPPart("c", 11), mkTailPart("z")], - [200], + mkPos(200), ), ( [mkPPart("a", 1), mkPPart("b", 11), mkPPart("c", 11)], @@ -321,9 +328,9 @@ def test_split(self): ) self.assertEqual( - split_partitions_around_positions( + _split_partitions_around_position( [mkPPart("a", 1, 100), mkPPart("b", 2, 200), mkTailPart("z", count=2)], - [10, 1000], + mkPos(10, 1000), ), ( [mkPPart("a", 1, 100), mkPPart("b", 2, 200)], @@ -333,74 +340,74 @@ def test_split(self): ) self.assertEqual( - split_partitions_around_positions( + _split_partitions_around_position( [mkPPart("a", 10, 10), mkPPart("b", 20, 20), mkTailPart("z", count=2)], - [19, 500], + mkPos(19, 500), ), ([mkPPart("a", 10, 10)], mkPPart("b", 20, 20), [mkTailPart("z", count=2)]), ) def test_get_position_increase_per_day(self): with self.assertRaises(ValueError): - get_position_increase_per_day( + _get_position_increase_per_day( mkTailPart("p_20201231"), mkPPart("p_20210101", 42) ) with self.assertRaises(ValueError): - get_position_increase_per_day( + _get_position_increase_per_day( mkPPart("p_20211231", 99), mkPPart("p_20210101", 42) ) with self.assertRaises(ValueError): - get_position_increase_per_day( + _get_position_increase_per_day( mkPPart("p_20201231", 1, 99), mkPPart("p_20210101", 42) ) self.assertEqual( - get_position_increase_per_day( + _get_position_increase_per_day( mkPPart("p_20201231", 0), mkPPart("p_20210101", 100) ), [100], ) self.assertEqual( - get_position_increase_per_day( + _get_position_increase_per_day( mkPPart("p_20201231", 0), mkPPart("p_20210410", 100) ), [1], ) self.assertEqual( - get_position_increase_per_day( + _get_position_increase_per_day( mkPPart("p_20201231", 0, 10), mkPPart("p_20210410", 100, 1000) ), [1, 9.9], ) def test_generate_weights(self): - self.assertEqual(generate_weights(1), [10000]) - self.assertEqual(generate_weights(3), [10000 / 3, 5000, 10000]) + self.assertEqual(_generate_weights(1), [10000]) + self.assertEqual(_generate_weights(3), [10000 / 3, 5000, 10000]) def test_get_weighted_position_increase_per_day_for_partitions(self): with self.assertRaises(ValueError): - get_weighted_position_increase_per_day_for_partitions(list()) + _get_weighted_position_increase_per_day_for_partitions(list()) self.assertEqual( - get_weighted_position_increase_per_day_for_partitions( + _get_weighted_position_increase_per_day_for_partitions( [mkPPart("p_20201231", 0), mkPPart("p_20210101", 100)] ), [100], ) self.assertEqual( - get_weighted_position_increase_per_day_for_partitions( + _get_weighted_position_increase_per_day_for_partitions( [mkPPart("p_20201231", 0), mkPPart("p_20210410", 100)] ), [1], ) self.assertEqual( - get_weighted_position_increase_per_day_for_partitions( + _get_weighted_position_increase_per_day_for_partitions( [mkPPart("p_20201231", 50, 50), mkPPart("p_20210410", 100, 500)] ), [0.5, 4.5], ) self.assertEqual( - get_weighted_position_increase_per_day_for_partitions( + _get_weighted_position_increase_per_day_for_partitions( [ mkPPart("p_20200922", 0), mkPPart("p_20201231", 100), # rate = 1/day @@ -410,7 +417,7 @@ def test_get_weighted_position_increase_per_day_for_partitions(self): [7], ) self.assertEqual( - get_weighted_position_increase_per_day_for_partitions( + _get_weighted_position_increase_per_day_for_partitions( [ mkPPart("p_20200922", 0), mkPPart("p_20201231", 100), # 1/day @@ -423,59 +430,71 @@ def test_get_weighted_position_increase_per_day_for_partitions(self): def test_predict_forward_position(self): with self.assertRaises(ValueError): - predict_forward_position([0], [1, 2], timedelta(days=1)) + _predict_forward_position([0], [1, 2], timedelta(days=1)) with self.assertRaises(ValueError): - predict_forward_position([1, 2], [3], timedelta(days=1)) + _predict_forward_position([1, 2], [3], timedelta(days=1)) with self.assertRaises(ValueError): - predict_forward_position([1, 2], [-1], timedelta(days=1)) + _predict_forward_position([1, 2], [-1], timedelta(days=1)) - self.assertEqual(predict_forward_position([0], [500], timedelta(days=1)), [500]) + self.assertEqual( + _predict_forward_position([0], [500], timedelta(days=1)), [500] + ) - self.assertEqual(predict_forward_position([0], [125], timedelta(days=4)), [500]) + self.assertEqual( + _predict_forward_position([0], [125], timedelta(days=4)), [500] + ) def test_predict_forward_time(self): t = datetime(2000, 1, 1) with self.assertRaises(ValueError): - predict_forward_time([0, 0], [100], [100], t) + _predict_forward_time(mkPos(0, 0), mkPos(100), [100], t) with self.assertRaises(ValueError): - predict_forward_time([0], [100, 0], [100], t) + _predict_forward_time(mkPos(0), mkPos(100, 0), [100], t) with self.assertRaises(ValueError): - predict_forward_time([0], [100, 0], [100, 100], t) + _predict_forward_time(mkPos(0), mkPos(100, 0), [100, 100], t) with self.assertRaises(ValueError): - predict_forward_time([0], [100], [100, 100], t) + _predict_forward_time(mkPos(0), mkPos(100), [100, 100], t) with self.assertRaises(ValueError): - predict_forward_time([0], [100], [-1], t) + _predict_forward_time(mkPos(0), mkPos(100), [-1], t) with self.assertRaises(ValueError): - predict_forward_time([100], [99], [1], t) + _predict_forward_time(mkPos(100), mkPos(99), [1], t) with self.assertRaises(ValueError): # We should never be asked to operate on positions in the incorrect # order - predict_forward_time([101, 101], [100, 100], [200, 200], t) + _predict_forward_time(mkPos(101, 101), mkPos(100, 100), [200, 200], t) + with self.assertRaises(ValueError): + # Nonzero rates of change are bad too. + _predict_forward_time(mkPos(0, 0, 0), mkPos(100, 100, 100), [1, 1, 0], t) self.assertEqual( - predict_forward_time([0], [100], [100], t), t + timedelta(hours=24) + _predict_forward_time(mkPos(0), mkPos(100), [100], t), + t + timedelta(hours=24), ) self.assertEqual( - predict_forward_time([0], [100], [200], t), t + timedelta(hours=12) + _predict_forward_time(mkPos(0), mkPos(100), [200], t), + t + timedelta(hours=12), ) self.assertEqual( - predict_forward_time([0], [100], [200], t), t + timedelta(hours=12) + _predict_forward_time(mkPos(0), mkPos(100), [200], t), + t + timedelta(hours=12), ) # It must be OK to have some positions already well beyond the endpoint self.assertEqual( - predict_forward_time([0, 200], [100, 100], [200, 200], t), + _predict_forward_time(mkPos(0, 200), mkPos(100, 100), [200, 200], t), t + timedelta(hours=12), ) - self.assertEqual(predict_forward_time([100, 100], [100, 100], [200, 200], t), t) + self.assertEqual( + _predict_forward_time(mkPos(100, 100), mkPos(100, 100), [200, 200], t), t + ) def test_plan_partition_changes_no_empty_partitions(self): with self.assertRaises(NoEmptyPartitionsAvailableException): - plan_partition_changes( + _plan_partition_changes( [mkPPart("p_20201231", 0), mkPPart("p_20210102", 200)], - [50], + mkPos(50), datetime(2021, 1, 1, tzinfo=timezone.utc), timedelta(days=7), 2, @@ -483,13 +502,13 @@ def test_plan_partition_changes_no_empty_partitions(self): def test_plan_partition_changes_imminent(self): with self.assertLogs("plan_partition_changes", level="INFO") as logctx: - planned = plan_partition_changes( + planned = _plan_partition_changes( [ mkPPart("p_20201231", 100), mkPPart("p_20210102", 200), mkTailPart("future"), ], - [50], + mkPos(50), datetime(2021, 1, 1, hour=23, minute=55, tzinfo=timezone.utc), timedelta(days=2), 3, @@ -523,13 +542,13 @@ def test_plan_partition_changes_imminent(self): def test_plan_partition_changes_wildly_off_dates(self): with self.assertLogs("plan_partition_changes", level="INFO") as logctx: - planned = plan_partition_changes( + planned = _plan_partition_changes( [ mkPPart("p_20201231", 100), mkPPart("p_20210104", 200), mkTailPart("future"), ], - [50], + mkPos(50), datetime(2021, 1, 1, tzinfo=timezone.utc), timedelta(days=7), 2, @@ -559,13 +578,13 @@ def test_plan_partition_changes_wildly_off_dates(self): ) def test_plan_partition_changes_long_delay(self): - planned = plan_partition_changes( + planned = _plan_partition_changes( [ mkPPart("p_20210101", 100), mkPPart("p_20210415", 200), mkTailPart("future"), ], - [50], + mkPos(50), datetime(2021, 3, 31, tzinfo=timezone.utc), timedelta(days=7), 2, @@ -585,14 +604,14 @@ def test_plan_partition_changes_long_delay(self): ) def test_plan_partition_changes_short_names(self): - planned = plan_partition_changes( + planned = _plan_partition_changes( [ mkPPart("p_2019", 1912499867), mkPPart("p_2020", 8890030931), mkPPart("p_20210125", 12010339136), mkTailPart("p_future"), ], - [10810339136], + mkPos(10810339136), datetime(2021, 1, 30, tzinfo=timezone.utc), timedelta(days=7), 2, @@ -626,9 +645,9 @@ def test_plan_partition_changes_short_names(self): ) def test_plan_partition_changes_bespoke_names(self): - planned = plan_partition_changes( + planned = _plan_partition_changes( [mkPPart("p_start", 100), mkTailPart("p_future")], - [50], + mkPos(50), datetime(2021, 1, 6, tzinfo=timezone.utc), timedelta(days=7), 2, @@ -660,13 +679,13 @@ def test_plan_partition_changes_bespoke_names(self): ) def test_plan_partition_changes(self): - planned = plan_partition_changes( + planned = _plan_partition_changes( [ mkPPart("p_20201231", 100), mkPPart("p_20210102", 200), mkTailPart("future"), ], - [50], + mkPos(50), datetime(2021, 1, 1, tzinfo=timezone.utc), timedelta(days=7), 2, @@ -684,13 +703,13 @@ def test_plan_partition_changes(self): ) self.assertEqual( - plan_partition_changes( + _plan_partition_changes( [ mkPPart("p_20201231", 100), mkPPart("p_20210102", 200), mkTailPart("future"), ], - [199], + mkPos(199), datetime(2021, 1, 3, tzinfo=timezone.utc), timedelta(days=7), 3, @@ -709,15 +728,47 @@ def test_plan_partition_changes(self): ], ) - def test_evaluate_partition_changes(self): + def test_plan_partition_changes_misprediction(self): + """ We have to handle the case where the partition list doesn't cleanly + match reality. """ + planned = _plan_partition_changes( + [ + mkPPart("p_20210505", 9505010028), + mkPPart("p_20210604", 10152257517), + mkPPart("p_20210704", 10799505006), + mkTailPart("p_20210803"), + ], + mkPos(10264818175), + datetime(2021, 6, 8, tzinfo=timezone.utc), + timedelta(days=30), + 3, + ) + + self.assertEqual( + planned, + [ + ChangePlannedPartition(mkPPart("p_20210704", 10799505006)), + ChangePlannedPartition(mkTailPart("p_20210803")).set_position( + [11578057459] + ), + NewPlannedPartition() + .set_position([12356609912]) + .set_timestamp(datetime(2021, 9, 2, tzinfo=timezone.utc)), + NewPlannedPartition() + .set_columns(1) + .set_timestamp(datetime(2021, 10, 2, tzinfo=timezone.utc)), + ], + ) + + def test_should_run_changes(self): self.assertFalse( - evaluate_partition_changes( + _should_run_changes( [ChangePlannedPartition(mkPPart("p_20210102", 200)).set_position([300])] ) ) self.assertFalse( - evaluate_partition_changes( + _should_run_changes( [ ChangePlannedPartition(mkPPart("p_20210102", 200)).set_position( [300] @@ -728,9 +779,9 @@ def test_evaluate_partition_changes(self): ] ) ) - with self.assertLogs("evaluate_partition_changes", level="DEBUG") as logctx: + with self.assertLogs("should_run_changes", level="DEBUG") as logctx: self.assertTrue( - evaluate_partition_changes( + _should_run_changes( [ ChangePlannedPartition(mkPPart("p_20210102", 200)).set_position( [302] @@ -749,15 +800,12 @@ def test_evaluate_partition_changes(self): ) self.assertEqual( logctx.output, - [ - "DEBUG:evaluate_partition_changes:Add: [542] 2021-01-16 " - "00:00:00+00:00 is new" - ], + ["DEBUG:should_run_changes:Add: [542] 2021-01-16 " "00:00:00+00:00 is new"], ) - with self.assertLogs("evaluate_partition_changes", level="DEBUG") as logctx: + with self.assertLogs("should_run_changes", level="DEBUG") as logctx: self.assertTrue( - evaluate_partition_changes( + _should_run_changes( [ ChangePlannedPartition(mkPPart("p_20210102", 200)), NewPlannedPartition() @@ -771,13 +819,10 @@ def test_evaluate_partition_changes(self): ) self.assertEqual( logctx.output, - [ - "DEBUG:evaluate_partition_changes:Add: [542] 2021-01-16 " - "00:00:00+00:00 is new" - ], + ["DEBUG:should_run_changes:Add: [542] 2021-01-16 " "00:00:00+00:00 is new"], ) - def test_generate_sql_reorganize_partition_commands_no_change(self): + def testgenerate_sql_reorganize_partition_commands_no_change(self): self.assertEqual( list( generate_sql_reorganize_partition_commands( @@ -787,7 +832,7 @@ def test_generate_sql_reorganize_partition_commands_no_change(self): [], ) - def test_generate_sql_reorganize_partition_commands_single_change(self): + def testgenerate_sql_reorganize_partition_commands_single_change(self): self.assertEqual( list( generate_sql_reorganize_partition_commands( @@ -805,7 +850,7 @@ def test_generate_sql_reorganize_partition_commands_single_change(self): ], ) - def test_generate_sql_reorganize_partition_commands_two_changes(self): + def testgenerate_sql_reorganize_partition_commands_two_changes(self): self.assertEqual( list( generate_sql_reorganize_partition_commands( @@ -828,7 +873,7 @@ def test_generate_sql_reorganize_partition_commands_two_changes(self): ], ) - def test_generate_sql_reorganize_partition_commands_new_partitions(self): + def testgenerate_sql_reorganize_partition_commands_new_partitions(self): self.assertEqual( list( generate_sql_reorganize_partition_commands( @@ -852,7 +897,7 @@ def test_generate_sql_reorganize_partition_commands_new_partitions(self): ], ) - def test_generate_sql_reorganize_partition_commands_maintain_new_partition(self): + def testgenerate_sql_reorganize_partition_commands_maintain_new_partition(self): self.assertEqual( list( generate_sql_reorganize_partition_commands( @@ -882,7 +927,7 @@ def test_generate_sql_reorganize_partition_commands_maintain_new_partition(self) ], ) - def test_generate_sql_reorganize_partition_commands_with_duplicate(self): + def testgenerate_sql_reorganize_partition_commands_with_duplicate(self): with self.assertRaises(DuplicatePartitionException): list( generate_sql_reorganize_partition_commands( @@ -901,16 +946,35 @@ def test_generate_sql_reorganize_partition_commands_with_duplicate(self): ) ) - def test_plan_and_generate_sql_reorganize_partition_commands_with_future_partition( + def testgenerate_sql_reorganize_partition_commands_out_of_order(self): + with self.assertRaises(AssertionError): + list( + generate_sql_reorganize_partition_commands( + Table("table_with_out_of_order_changeset"), + [ + ChangePlannedPartition(mkTailPart("past")) + .set_position([800]) + .set_timestamp(datetime(2021, 1, 14, tzinfo=timezone.utc)), + NewPlannedPartition() + .set_position([1000]) + .set_timestamp(datetime(2021, 1, 15, tzinfo=timezone.utc)), + ChangePlannedPartition(mkTailPart("future")) + .set_position([1200]) + .set_timestamp(datetime(2021, 1, 16, tzinfo=timezone.utc)), + ], + ) + ) + + def test_plan_andgenerate_sql_reorganize_partition_commands_with_future_partition( self ): - planned = plan_partition_changes( + planned = _plan_partition_changes( [ mkPPart("p_20201231", 100), mkPPart("p_20210104", 200), mkTailPart("future"), ], - [50], + mkPos(50), datetime(2021, 1, 1, tzinfo=timezone.utc), timedelta(days=7), 2, @@ -926,6 +990,68 @@ def test_plan_and_generate_sql_reorganize_partition_commands_with_future_partiti ], ) + def test_get_pending_sql_reorganize_partition_commands_no_changes(self): + with self.assertLogs( + "get_pending_sql_reorganize_partition_commands", level="INFO" + ) as logctx: + cmds = get_pending_sql_reorganize_partition_commands( + table=Table("plushies"), + partition_list=[ + mkPPart("p_20201231", 100), + mkPPart("p_20210102", 200), + mkTailPart("future"), + ], + current_position=mkPos(50), + allowed_lifespan=timedelta(days=7), + num_empty_partitions=2, + evaluation_time=datetime(2021, 1, 1, tzinfo=timezone.utc), + ) + + self.assertEqual( + logctx.output, + [ + "INFO:get_pending_sql_reorganize_partition_commands:" + "Table plushies does not need to be modified currently." + ], + ) + + self.assertEqual(cmds, []) + + def test_get_pending_sql_reorganize_partition_commands_with_changes(self): + with self.assertLogs( + "get_pending_sql_reorganize_partition_commands", level="DEBUG" + ) as logctx: + cmds = get_pending_sql_reorganize_partition_commands( + table=Table("plushies"), + partition_list=[ + mkPPart("p_20201231", 100), + mkPPart("p_20210102", 200), + mkTailPart("future"), + ], + current_position=mkPos(50), + allowed_lifespan=timedelta(days=7), + num_empty_partitions=4, + evaluation_time=datetime(2021, 1, 1, tzinfo=timezone.utc), + ) + + self.assertEqual( + logctx.output, + [ + "DEBUG:get_pending_sql_reorganize_partition_commands:" + "Table plushies has changes waiting." + ], + ) + + self.assertEqual( + list(cmds), + [ + "ALTER TABLE `plushies` REORGANIZE PARTITION `future` INTO " + "(PARTITION `p_20210109` VALUES LESS THAN (550), " + "PARTITION `p_20210116` VALUES LESS THAN (900), " + "PARTITION `p_20210123` VALUES LESS THAN MAXVALUE);" + ], + ) + if __name__ == "__main__": unittest.main() diff --git a/partitionmanager/tools.py b/partitionmanager/tools.py index 91796a1..caccbef 100644 --- a/partitionmanager/tools.py +++ b/partitionmanager/tools.py @@ -6,21 +6,17 @@ def pairwise(iterable): - """ - iterable -> (s0,s1), (s1,s2), (s2, s3), ... (s_n-1, s_n). - """ + """iterable -> (s0,s1), (s1,s2), (s2, s3), ... (s_n-1, s_n).""" a, b = tee(iterable) next(b, None) return zip(a, b) def iter_show_end(iterable): - """ - iterable -> (s0, false), (s1, false), ... (s_n, true). - """ + """iterable -> (s0, false), (s1, false), ... (s_n, true).""" it = iter(iterable) - last = next(it) + prev = next(it) for val in it: - yield last, False - last = val - yield last, True + yield prev, False + prev = val + yield prev, True diff --git a/partitionmanager/types.py b/partitionmanager/types.py index b1f3f83..4f0981e 100644 --- a/partitionmanager/types.py +++ b/partitionmanager/types.py @@ -9,10 +9,10 @@ from urllib.parse import urlparse -def retention_from_dict(r): +def timedelta_from_dict(r): """ Process a dictionary, typically from YAML, which describes a table's - retetntion period. Returns a timedelta or None, and raises an argparse + retention or partition period. Returns a timedelta or None, and raises an argparse error if the arguments are not understood. """ for k, v in r.items(): @@ -74,7 +74,7 @@ def __repr__(self): return str(self) -def toSqlUrl(urlstring): +def to_sql_url(urlstring): """ Parse a sql://user:pass@host:port/schema URL and return the tuple. """ @@ -109,55 +109,57 @@ def db_name(self): """ -class Partition(abc.ABC): - """ - Abstract class which represents a single, currently-defined SQL table - partition. The subclasses represent: a partition with position information, - PositionPartition; those which are the tail partition and catch IDs beyond - the defined positions, MaxValuePartition; and a helper class, - InstantPartition, which is only used temporarily and never stored. - """ +def is_partition_type(obj): + """ True if the object inherits from a _Partition. """ + return isinstance(obj, _Partition) + + +class _Partition(abc.ABC): + """Abstract class which represents a existing table partition.""" @abc.abstractmethod def values(self): - """ - Return a SQL partition value string. - """ + """Return a SQL partition value string.""" @property @abc.abstractmethod def name(self): - """ - Return the partition's name, which should generally represent the - date that the partition begins to fill, of the form p_yyyymmdd + """Name representing when the partition began to fill. + + Generally this will be of the form p_yyyymmdd, but sometimes partitions + have names like p_initial, p_start, or any other valid SQL identifier. """ @property @abc.abstractmethod def num_columns(self): - """ - Return the number of columns this partition represents - """ + """Return the number of columns included in this partition's range.""" @property - def has_time(self): - """ - True if the partition has a timestamp, e.g. if timestamp() can be - reasonably assumed to be non-None. Doesn't gaurantee, as this only - allows for names to be of the form p_start or p_YYYY[MM[DD]]. + def has_real_time(self): + """True if the partition has a non-synthetic timestamp. + + This should be used to determine whether timestamp() should be used for + statistical purposes, as timestamp() generates a synthetic timestamp + for rate-of-change calculations in corner-cases. """ - if "start" in self.name: + if "p_start" in self.name or not self.name.startswith("p_"): return False - return True + return self.timestamp() is not None def timestamp(self): + """Returns datetime of this partition's date, or None. + + This returns the date from the partition's name if the partition is of + the form "p_YYYYMMDD". If the name is "p_start", return a synthetic + timestamp (be sure to use self.has_real_time before using for + statistical purposes). Otherwise, returns None. """ - Returns a datetime object representing this partition's - date, if the partition is of the form "p_YYYYMMDD", otherwise - returns None - """ - if not self.has_time: + if not self.name.startswith("p_"): + return None + + if "p_start" in self.name: # Gotta start somewhere, for partitions named things like # "p_start". This has the downside of causing abnormally-low # rate of change calculations, but they fall off quickly @@ -176,7 +178,6 @@ def timestamp(self): return datetime.strptime(self.name, "p_%Y").replace(tzinfo=timezone.utc) except ValueError: pass - return None def __repr__(self): @@ -186,70 +187,122 @@ def __str__(self): return f"{self.name}: {self.values()}" -class PositionPartition(Partition): +class Position: + """ An internal class that represents a position as an ordered list of + identifiers, matching the table's partition-by statement. """ - A partition that may have positions assocated with it. + + def __init__(self): + self._position = list() + + def set_position(self, position_in): + """Set the list of identifiers for this position.""" + if isinstance(position_in, Position): + self._position = position_in.as_list() + elif isinstance(position_in, list) or isinstance(position_in, tuple): + self._position = [int(p) for p in position_in] + else: + raise ValueError(f"Unexpected position input: {position_in}") + return self + + def as_list(self): + """Return a copy of the list of identifiers representing this position""" + return self._position.copy() + + def __len__(self): + return len(self._position) + + def __eq__(self, other): + if isinstance(other, Position): + return self._position == other.as_list() + return False + + def __str__(self): + return str(self._position) + + def __repr__(self): + return repr(self._position) + + +class PositionPartition(_Partition): + """A partition that has a position assocated with it. + + Partitions are independent table segments, and each has a name and a current + position. The positions-list is an ordered list of identifiers, matching + the order of the table's partition-by statement when the table was created. """ def __init__(self, name): self._name = name - self.positions = list() + self._position = Position() @property def name(self): return self._name - def set_position(self, positions): - """ - Set the position list for this partition. - """ - self.positions = [int(p) for p in positions] + def set_position(self, position_in): + """Set the position for this partition.""" + self._position.set_position(position_in) return self + @property + def position(self): + """Return the Position this partition represents""" + return self._position + @property def num_columns(self): - return len(self.positions) + return len(self._position) def values(self): - return "(" + ", ".join([str(x) for x in self.positions]) + ")" + return "(" + ", ".join([str(x) for x in self._position.as_list()]) + ")" def __lt__(self, other): if isinstance(other, MaxValuePartition): - if len(self.positions) != other.num_columns: + if len(self._position) != other.num_columns: raise UnexpectedPartitionException( - f"Expected {len(self.positions)} columns but " + f"Expected {len(self._position)} columns but " f"partition has {other.num_columns}." ) return True - other_positions = None + + other_position_list = None if isinstance(other, list): - other_positions = other + other_position_list = other + elif isinstance(other, Position): + other_position_list = other.as_list() elif isinstance(other, PositionPartition): - other_positions = other.positions - if not other_positions or len(self.positions) != len(other_positions): + other_position_list = other.position.as_list() + + if not other_position_list or len(self._position) != len(other_position_list): raise UnexpectedPartitionException( - f"Expected {len(self.positions)} columns but partition has {other_positions}." + f"Expected {len(self._position)} columns but partition has {other_position_list}." ) - for v_mine, v_other in zip(self.positions, other_positions): + + for v_mine, v_other in zip(self._position.as_list(), other_position_list): if v_mine >= v_other: return False return True def __eq__(self, other): if isinstance(other, PositionPartition): - return self.name == other.name and self.positions == other.positions - return False + return self.name == other.name and self._position == other.position + elif isinstance(other, MaxValuePartition): + return False + raise ValueError(f"Unexpected equality with {other}") -class MaxValuePartition(Partition): - """ - A partition that lives at the tail of a partition list, saying - all remaining values belong in this partition. + +class MaxValuePartition(_Partition): + """A partition that includes all remaining values. + + This kind of partition always resides at the tail of the partition list, + and is defined as containing values up to the reserved keyword MAXVALUE. """ def __init__(self, name, count): self._name = name - self.count = count + self._count = count @property def name(self): @@ -257,187 +310,191 @@ def name(self): @property def num_columns(self): - return self.count + return self._count def values(self): - return ", ".join(["MAXVALUE"] * self.count) + return ", ".join(["MAXVALUE"] * self._count) def __lt__(self, other): - """ - MaxValuePartitions are always greater than every other partition - """ - if isinstance(other, list): - if self.count != len(other): + """MaxValuePartitions are always greater than every other partition.""" + if isinstance(other, list) or isinstance(other, Position): + if self._count != len(other): raise UnexpectedPartitionException( - f"Expected {self.count} columns but list has {len(other)}." + f"Expected {self._count} columns but list has {len(other)}." ) return False - if isinstance(other, Partition): - if self.count != other.num_columns: + if is_partition_type(other): + if self._count != other.num_columns: raise UnexpectedPartitionException( - f"Expected {self.count} columns but list has {other.num_columns}." + f"Expected {self._count} columns but list has {other.num_columns}." ) return False return ValueError() def __eq__(self, other): if isinstance(other, MaxValuePartition): - return self.name == other.name and self.count == other.count - return False + return self.name == other.name and self._count == other.num_columns + elif isinstance(other, PositionPartition): + return False + raise ValueError(f"Unexpected equality with {other}") class InstantPartition(PositionPartition): - """ - Represent a partition at the current moment, used for rate calculations - as a stand-in that only exists for the purposes of the rate calculation - itself. + """Represent a partition at the current moment. + + Used for rate calculations as a stand-in that only exists for the purposes + of the rate calculation itself. """ - def __init__(self, now, positions): + def __init__(self, now, position_in): super().__init__("Instant") - self.instant = now - self.positions = positions + self._instant = now + self._position.set_position(position_in) def timestamp(self): - return self.instant + return self._instant -class PlannedPartition(abc.ABC): - """ - An abstract class representing a partition this tool plans to emit. If - the partition is an edit to an existing one, it will be the concrete type - ChangePlannedPartition. For new partitions, it'll be NewPlannedPartition. +class _PlannedPartition(abc.ABC): + """Represents a partition this tool plans to emit. + + The method as_partition will make this a concrete type for later evaluation. """ def __init__(self): - self.num_columns = None - self.positions = None + self._num_columns = None + self._position = None self._timestamp = None self._important = False def set_timestamp(self, timestamp): - """ - Set the timestamp to be used for the modified partition. This - effectively changes the partition's name. + """Set the timestamp to be used for the modified partition. + + This effectively changes the partition's name. """ self._timestamp = timestamp.replace(hour=0, minute=0) return self - def set_position(self, pos): - """ - Set the position of this modified partition. If this partition - changes an existing partition, the positions of both must have - identical length. + def set_position(self, position_in): + """Set the position of this modified partition. + + If this partition changes an existing partition, the positions of both + must have identical length. """ - if not isinstance(pos, list): - raise ValueError() + pos = Position() + pos.set_position(position_in) + if self.num_columns is not None and len(pos) != self.num_columns: raise UnexpectedPartitionException( - f"Expected {self.num_columns} columns but list has {len(pos)}." + f"Expected {self.num_columns} columns but input has {len(pos)}." ) - self.positions = pos + + self._position = pos return self def set_important(self): - """ - Indicate this is an important partition. - """ + """Indicate this is an important partition. Used in the + _plan_partition_changes as a marker that there's a significant + change in this partition that should be committed even if the + overall map isn't changing much. """ self._important = True return self + @property + def position(self): + """Get the position for this modified partition.""" + return self._position + def timestamp(self): - """ - The timestamp of this partition. - """ + """The timestamp of this partition.""" return self._timestamp def important(self): - """ - Whether this modified Partition is itself important enough to ensure - commitment. - """ + """True if this Partition is important enough to ensure commitment.""" return self._important @property @abc.abstractmethod def has_modifications(self): - """ - True if this partition modifies another partition. - """ + """True if this partition modifies another partition.""" + + @property + def num_columns(self): + """Return the number of columns this partition represents.""" + return self._num_columns def set_as_max_value(self): - """ - Make this partition represent MAXVALUE and be represented by a - MaxValuePartition by the as_partition method. - """ - self.num_columns = len(self.positions) - self.positions = None + """Represent this partition by MaxValuePartition from as_partition()""" + self._num_columns = len(self._position) + self._position = None return self def as_partition(self): - """ - Convert this from a Planned Partition to a Partition, which can then be - rendered into a SQL ALTER. - """ + """Return a concrete Partition that can be rendered into a SQL ALTER.""" if not self._timestamp: raise ValueError() - if self.positions: + if self._position: return PositionPartition(f"p_{self._timestamp:%Y%m%d}").set_position( - self.positions + self._position ) - return MaxValuePartition(f"p_{self._timestamp:%Y%m%d}", count=self.num_columns) + return MaxValuePartition(f"p_{self._timestamp:%Y%m%d}", count=self._num_columns) def __repr__(self): return f"{type(self).__name__}<{str(self)}>" def __eq__(self, other): - if isinstance(other, PlannedPartition): + if isinstance(other, _PlannedPartition): return ( isinstance(self, type(other)) - and self.positions == other.positions + and self.position == other.position and self.timestamp() == other.timestamp() and self.important() == other.important() ) return False -class ChangePlannedPartition(PlannedPartition): - """ - Represents modifications to a Partition supplied during construction. Use - the parent class' methods to alter this change. +class ChangePlannedPartition(_PlannedPartition): + """Represents modifications to a Partition supplied during construction. + + Use the parent class' methods to alter this change. """ def __init__(self, old_part): - if not isinstance(old_part, Partition): + if not is_partition_type(old_part): raise ValueError() super().__init__() - self.old = old_part - self.num_columns = self.old.num_columns - self._timestamp = self.old.timestamp() - self._old_positions = ( - self.old.positions if isinstance(old_part, PositionPartition) else None + self._old = old_part + self._num_columns = self._old.num_columns + self._timestamp = self._old.timestamp() + self._old_position = ( + self._old.position if isinstance(old_part, PositionPartition) else None ) - self.positions = self._old_positions + self._position = self._old_position @property def has_modifications(self): return ( - self.positions != self._old_positions - or self.old.timestamp() is None + self._position != self._old_position + or self._old.timestamp() is None and self._timestamp is not None - or self._timestamp.date() != self.old.timestamp().date() + or self._timestamp.date() != self._old.timestamp().date() ) + @property + def old(self): + """Get the partition to be modified""" + return self._old + def __str__(self): imp = "[!!]" if self.important() else "" - return f"{self.old} => {self.positions} {imp} {self._timestamp}" + return f"{self._old} => {self.position} {imp} {self._timestamp}" -class NewPlannedPartition(PlannedPartition): - """ - Represents a wholly new Partition to be constructed. After construction, - you must set the number of columns using set_columns before attempting - to use this in a plan. +class NewPlannedPartition(_PlannedPartition): + """Represents a wholly new Partition to be constructed. + + After construction, you must set the number of columns using set_columns + before attempting to use this in a plan. """ def __init__(self): @@ -445,11 +502,9 @@ def __init__(self): self.set_important() def set_columns(self, count): - """ - Set the number of columns needed to represent a position for this - partition. - """ - self.num_columns = count + """Set the number of columns needed to represent a position for this + partition.""" + self._num_columns = count return self @property @@ -457,40 +512,28 @@ def has_modifications(self): return False def __str__(self): - return f"Add: {self.positions} {self._timestamp}" + return f"Add: {self.position} {self._timestamp}" class MismatchedIdException(Exception): - """ - Raised if the partition map doesn't use the primary key as its range id. - """ + """ Raised if the partition map doesn't use the primary key as its range id.""" class TruncatedDatabaseResultException(Exception): - """ - Raised if the XML schema truncated over a subprocess interaction - """ + """Raised if the XML schema truncated over a subprocess interaction""" class DuplicatePartitionException(Exception): - """ - Raise if a partition being created already exists. - """ + """Raise if a partition being created already exists.""" class UnexpectedPartitionException(Exception): - """ - Raised when the partition map is unexpected. - """ + """Raised when the partition map is unexpected.""" class TableInformationException(Exception): - """ - Raised when the table's status doesn't include the information we need. - """ + """Raised when the table's status doesn't include the information we need.""" class NoEmptyPartitionsAvailableException(Exception): - """ - Raised if no empty partitions are available to safely modify. - """ + """Raised if no empty partitions are available to safely modify.""" diff --git a/partitionmanager/types_test.py b/partitionmanager/types_test.py index b266c94..176def5 100644 --- a/partitionmanager/types_test.py +++ b/partitionmanager/types_test.py @@ -4,19 +4,27 @@ from .types import ( ChangePlannedPartition, InstantPartition, + is_partition_type, MaxValuePartition, NewPlannedPartition, + Position, PositionPartition, - retention_from_dict, + timedelta_from_dict, SqlInput, Table, - toSqlUrl, + to_sql_url, UnexpectedPartitionException, ) +def mkPos(*pos): + p = Position() + p.set_position(pos) + return p + + def mkPPart(name, *pos): - return PositionPartition(name).set_position(pos) + return PositionPartition(name).set_position(mkPos(*pos)) def mkTailPart(name, count=1): @@ -26,43 +34,43 @@ def mkTailPart(name, count=1): class TestTypes(unittest.TestCase): def test_dburl_invalid(self): with self.assertRaises(argparse.ArgumentTypeError): - toSqlUrl("http://localhost/dbname") + to_sql_url("http://localhost/dbname") def test_dburl_without_db_path(self): with self.assertRaises(argparse.ArgumentTypeError): - toSqlUrl("sql://localhost") + to_sql_url("sql://localhost") with self.assertRaises(argparse.ArgumentTypeError): - toSqlUrl("sql://localhost/") + to_sql_url("sql://localhost/") def test_dburl_with_two_passwords(self): - u = toSqlUrl("sql://username:password:else@localhost:3306/database") + u = to_sql_url("sql://username:password:else@localhost:3306/database") self.assertEqual(u.username, "username") self.assertEqual(u.password, "password:else") self.assertEqual(u.port, 3306) def test_dburl_with_port(self): - u = toSqlUrl("sql://localhost:3306/database") + u = to_sql_url("sql://localhost:3306/database") self.assertEqual(u.hostname, "localhost") self.assertEqual(u.username, None) self.assertEqual(u.password, None) self.assertEqual(u.port, 3306) def test_dburl_with_no_port(self): - u = toSqlUrl("sql://localhost/database") + u = to_sql_url("sql://localhost/database") self.assertEqual(u.hostname, "localhost") self.assertEqual(u.username, None) self.assertEqual(u.password, None) self.assertEqual(u.port, None) def test_dburl_with_user_pass_and_no_port(self): - u = toSqlUrl("sql://username:password@localhost/database") + u = to_sql_url("sql://username:password@localhost/database") self.assertEqual(u.hostname, "localhost") self.assertEqual(u.username, "username") self.assertEqual(u.password, "password") self.assertEqual(u.port, None) def test_dburl_with_user_pass_and_port(self): - u = toSqlUrl("sql://username:password@localhost:911/database") + u = to_sql_url("sql://username:password@localhost:911/database") self.assertEqual(u.hostname, "localhost") self.assertEqual(u.username, "username") self.assertEqual(u.password, "password") @@ -83,18 +91,18 @@ def test_table(self): ) with self.assertRaises(argparse.ArgumentTypeError): - retention_from_dict({"something": 1}) + timedelta_from_dict({"something": 1}) with self.assertRaises(argparse.ArgumentTypeError): - retention_from_dict({"another thing": 1, "days": 30}) + timedelta_from_dict({"another thing": 1, "days": 30}) - r = retention_from_dict(dict()) + r = timedelta_from_dict(dict()) self.assertEqual(None, r) with self.assertRaises(TypeError): - retention_from_dict({"days": "thirty"}) + timedelta_from_dict({"days": "thirty"}) - r = retention_from_dict({"days": 30}) + r = timedelta_from_dict({"days": 30}) self.assertEqual(timedelta(days=30), r) def test_changed_partition(self): @@ -119,7 +127,7 @@ def test_changed_partition(self): self.assertTrue(c.has_modifications) self.assertEqual(c.timestamp(), datetime(2021, 1, 2)) - self.assertEqual(c.positions, [10, 10, 10, 10]) + self.assertEqual(c.position.as_list(), [10, 10, 10, 10]) self.assertEqual( c.as_partition(), @@ -130,7 +138,7 @@ def test_changed_partition(self): MaxValuePartition("p_20210101", count=1) ).set_position([1949]) self.assertEqual(c_max.timestamp(), datetime(2021, 1, 1, tzinfo=timezone.utc)) - self.assertEqual(c_max.positions, [1949]) + self.assertEqual(c_max.position.as_list(), [1949]) self.assertEqual( ChangePlannedPartition( @@ -207,7 +215,7 @@ def test_new_partition(self): .set_position([3]) .set_timestamp(datetime(2021, 12, 31)) .as_partition(), - PositionPartition("p_20211231").set_position([3]), + PositionPartition("p_20211231").set_position(mkPos(3)), ) self.assertEqual( @@ -238,12 +246,20 @@ def test_new_partition(self): class TestPartition(unittest.TestCase): def test_partition_timestamps(self): - self.assertIsNone(PositionPartition("").timestamp()) + self.assertFalse(PositionPartition("p_start").has_real_time) + self.assertEqual( + PositionPartition("p_start").timestamp(), + datetime(2021, 1, 1, tzinfo=timezone.utc), + ) + self.assertFalse(PositionPartition("not_a_date").has_real_time) self.assertIsNone(PositionPartition("not_a_date").timestamp()) + self.assertFalse(PositionPartition("p_202012310130").has_real_time) self.assertIsNone(PositionPartition("p_202012310130").timestamp()) + + self.assertTrue(PositionPartition("p_20011231").has_real_time) self.assertEqual( - PositionPartition("p_20201231").timestamp(), - datetime(2020, 12, 31, tzinfo=timezone.utc), + PositionPartition("p_20011231").timestamp(), + datetime(2001, 12, 31, tzinfo=timezone.utc), ) self.assertLess(mkPPart("a", 9), mkPPart("b", 11)) @@ -271,6 +287,13 @@ def test_instant_partition(self): now = datetime.utcnow() ip = InstantPartition(now, [1, 2]) - self.assertEqual(ip.positions, [1, 2]) + self.assertEqual(ip.position.as_list(), [1, 2]) self.assertEqual(ip.name, "Instant") self.assertEqual(ip.timestamp(), now) + + def test_is_partition_type(self): + self.assertTrue(is_partition_type(mkPPart("b", 1, 2))) + self.assertTrue(is_partition_type(InstantPartition(datetime.utcnow(), [1, 2]))) + self.assertFalse(is_partition_type(None)) + self.assertFalse(is_partition_type(1)) + self.assertFalse(is_partition_type(NewPlannedPartition()))