From 719a4c2255e3eb98ff8f6de2ce5e2f5edd893236 Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Mon, 20 Sep 2021 16:52:06 -0700 Subject: [PATCH] Handle mispredictions cleanly In the real world, we might mess up when naming a partition. This should be rare if partitionmanager is running often, since it'll rename partitions to match reality, but when it's running only rarely, things get out of date. This change avoids attempting to calculate rates-of-change using partitions that don't make sense - e.g., today is July 1, and our active partition says it starts in a week. That is plainly wrong, but we can still use our current rate-of-change. This expands on PR #12 by changing what the start-datetime is for new partitions after we mispredicted - without this change, if we had partitions through to December, but it's only August and we need more, the new partitions would be named for January instead of reflecting reality that they need to be named for Right Now. This also catches a bug where we could get timestamp name collisions. This is a lot less of an issue when I implement Tim's suggestion in #19, but for now this just increases dates by a day to avoid a collision, and that works well. --- partitionmanager/cli_test.py | 8 +-- partitionmanager/table_append_partition.py | 57 ++++++++++++++----- .../table_append_partition_test.py | 33 ++++++----- 3 files changed, 66 insertions(+), 32 deletions(-) diff --git a/partitionmanager/cli_test.py b/partitionmanager/cli_test.py index c0f4f48..bfdae7f 100644 --- a/partitionmanager/cli_test.py +++ b/partitionmanager/cli_test.py @@ -79,8 +79,8 @@ def test_partition_cmd_noop(self): "sql": ( "ALTER TABLE `testtable_noop` REORGANIZE PARTITION " "`p_20201204` INTO " - "(PARTITION `p_20201205` VALUES LESS THAN (548), " - "PARTITION `p_20210104` VALUES LESS THAN MAXVALUE);" + "(PARTITION `p_20201112` VALUES LESS THAN (548), " + "PARTITION `p_20201212` VALUES LESS THAN MAXVALUE);" ), "noop": True, } @@ -101,8 +101,8 @@ def test_partition_cmd_final(self): "sql": ( "ALTER TABLE `testtable_commit` REORGANIZE PARTITION " "`p_20201204` INTO " - "(PARTITION `p_20201205` VALUES LESS THAN (548), " - "PARTITION `p_20210104` VALUES LESS THAN MAXVALUE);" + "(PARTITION `p_20201112` VALUES LESS THAN (548), " + "PARTITION `p_20201212` VALUES LESS THAN MAXVALUE);" ), } }, diff --git a/partitionmanager/table_append_partition.py b/partitionmanager/table_append_partition.py index f2349be..6cf2959 100644 --- a/partitionmanager/table_append_partition.py +++ b/partitionmanager/table_append_partition.py @@ -308,7 +308,8 @@ def _predict_forward_time(current_position, end_position, rates, evaluation_time if max(days_remaining) < 0: raise ValueError(f"All values are negative: {days_remaining}") - return evaluation_time + (max(days_remaining) * timedelta(days=1)) + calculated = evaluation_time + (max(days_remaining) * timedelta(days=1)) + return calculated.replace(minute=0, second=0, microsecond=0) def _calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan): @@ -323,7 +324,7 @@ def _calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan): if partition_start_time < evaluation_time: # Partition start times should never be in the past. return evaluation_time - return partition_start_time + return partition_start_time.replace(minute=0, second=0, microsecond=0) def _plan_partition_changes( @@ -378,10 +379,13 @@ def _plan_partition_changes( # to exclude the future-dated, irrelevant partition. log.debug( f"Misprediction: Evaluation time ({evaluation_time}) is " - f"before the active partition {active_partition}. Excluding from " - "rate calculations." + f"before the active partition {active_partition}. Excluding " + "mispredicted partitions from the rate calculations." ) - rate_relevant_partitions = filled_partitions + [ + filled_partitions = filter( + lambda f: f.timestamp() < evaluation_time, filled_partitions + ) + rate_relevant_partitions = list(filled_partitions) + [ partitionmanager.types.InstantPartition(evaluation_time, current_position) ] @@ -403,16 +407,16 @@ def _plan_partition_changes( changed_partition = partitionmanager.types.ChangePlannedPartition(partition) + start_of_fill_time = _predict_forward_time( + current_position, last_changed.position, rates, evaluation_time + ) + if isinstance(partition, partitionmanager.types.PositionPartition): # We can't change the position on this partition, but we can adjust # the name to be more exact as to what date we expect it to begin # filling. If we calculate the start-of-fill date and it doesn't # match the partition's name, let's rename it and mark it as an # important change. - start_of_fill_time = _predict_forward_time( - current_position, last_changed.position, rates, evaluation_time - ) - if start_of_fill_time.date() != partition.timestamp().date(): log.info( f"Start-of-fill predicted at {start_of_fill_time.date()} " @@ -427,15 +431,21 @@ def _plan_partition_changes( # we calculate forward what position we expect and use it in the # future. - partition_start_time = _calculate_start_time( + nominal_partition_start_time = _calculate_start_time( last_changed.timestamp(), evaluation_time, allowed_lifespan ) + + # We use the nearest timestamp, which should generally be the + # calculated time, but could be the fill time based on predicting + # forward if we have gotten far off in our predictions in the past. + changed_partition.set_timestamp( + min(nominal_partition_start_time, start_of_fill_time) + ) + changed_part_pos = _predict_forward_position( last_changed.position.as_list(), rates, allowed_lifespan ) - changed_partition.set_position(changed_part_pos).set_timestamp( - partition_start_time - ) + changed_partition.set_position(changed_part_pos) results.append(changed_partition) @@ -455,6 +465,27 @@ def _plan_partition_changes( .set_timestamp(partition_start_time) ) + # Confirm we won't make timestamp conflicts + existing_timestamps = list(map(lambda p: p.timestamp(), partition_list)) + conflict_found = True + while conflict_found: + conflict_found = False + for partition in results: + if partition.timestamp() in existing_timestamps: + if ( + isinstance(partition, partitionmanager.types.ChangePlannedPartition) + and partition.timestamp() == partition.old.timestamp() + ): + # That's not a conflict + continue + + log.debug( + f"{partition} has a conflict for its timestamp, increasing by 1 day." + ) + partition.set_timestamp(partition.timestamp() + timedelta(days=1)) + conflict_found = True + break + # Final result is always MAXVALUE results[-1].set_as_max_value() diff --git a/partitionmanager/table_append_partition_test.py b/partitionmanager/table_append_partition_test.py index 822eefd..8fdb2c0 100644 --- a/partitionmanager/table_append_partition_test.py +++ b/partitionmanager/table_append_partition_test.py @@ -565,16 +565,16 @@ def test_plan_partition_changes_wildly_off_dates(self): ) self.assertEqual( - planned, [ ChangePlannedPartition(mkPPart("p_20201231", 100)), ChangePlannedPartition(mkPPart("p_20210104", 200)) .set_timestamp(datetime(2021, 1, 2, tzinfo=timezone.utc)) .set_important(), ChangePlannedPartition(mkTailPart("future")).set_timestamp( - datetime(2021, 1, 9, tzinfo=timezone.utc) + datetime(2021, 1, 5, tzinfo=timezone.utc) ), ], + planned, ) def test_plan_partition_changes_long_delay(self): @@ -604,6 +604,7 @@ def test_plan_partition_changes_long_delay(self): ) def test_plan_partition_changes_short_names(self): + self.maxDiff = None planned = _plan_partition_changes( [ mkPPart("p_2019", 1912499867), @@ -679,6 +680,7 @@ def test_plan_partition_changes_bespoke_names(self): ) def test_plan_partition_changes(self): + self.maxDiff = None planned = _plan_partition_changes( [ mkPPart("p_20201231", 100), @@ -697,7 +699,7 @@ def test_plan_partition_changes(self): ChangePlannedPartition(mkPPart("p_20201231", 100)), ChangePlannedPartition(mkPPart("p_20210102", 200)), ChangePlannedPartition(mkTailPart("future")).set_timestamp( - datetime(2021, 1, 9, tzinfo=timezone.utc) + datetime(2021, 1, 4, tzinfo=timezone.utc) ), ], ) @@ -718,19 +720,20 @@ def test_plan_partition_changes(self): ChangePlannedPartition(mkPPart("p_20210102", 200)).set_position([200]), ChangePlannedPartition(mkTailPart("future")) .set_position([320]) - .set_timestamp(datetime(2021, 1, 9, tzinfo=timezone.utc)), + .set_timestamp(datetime(2021, 1, 3, tzinfo=timezone.utc)), NewPlannedPartition() .set_position([440]) - .set_timestamp(datetime(2021, 1, 16, tzinfo=timezone.utc)), + .set_timestamp(datetime(2021, 1, 10, tzinfo=timezone.utc)), NewPlannedPartition() .set_columns(1) - .set_timestamp(datetime(2021, 1, 23, tzinfo=timezone.utc)), + .set_timestamp(datetime(2021, 1, 17, tzinfo=timezone.utc)), ], ) def test_plan_partition_changes_misprediction(self): """ We have to handle the case where the partition list doesn't cleanly match reality. """ + self.maxDiff = None planned = _plan_partition_changes( [ mkPPart("p_20210505", 9505010028), @@ -748,15 +751,15 @@ def test_plan_partition_changes_misprediction(self): planned, [ ChangePlannedPartition(mkPPart("p_20210704", 10799505006)), - ChangePlannedPartition(mkTailPart("p_20210803")).set_position( - [11578057459] - ), + ChangePlannedPartition(mkTailPart("p_20210803")) + .set_position([11578057459]) + .set_timestamp(datetime(2021, 6, 28, tzinfo=timezone.utc)), NewPlannedPartition() .set_position([12356609912]) - .set_timestamp(datetime(2021, 9, 2, tzinfo=timezone.utc)), + .set_timestamp(datetime(2021, 7, 28, tzinfo=timezone.utc)), NewPlannedPartition() .set_columns(1) - .set_timestamp(datetime(2021, 10, 2, tzinfo=timezone.utc)), + .set_timestamp(datetime(2021, 8, 27, tzinfo=timezone.utc)), ], ) @@ -984,7 +987,7 @@ def test_plan_andgenerate_sql_reorganize_partition_commands_with_future_partitio list(generate_sql_reorganize_partition_commands(Table("water"), planned)), [ "ALTER TABLE `water` REORGANIZE PARTITION `future` INTO " - "(PARTITION `p_20210109` VALUES LESS THAN MAXVALUE);", + "(PARTITION `p_20210105` VALUES LESS THAN MAXVALUE);", "ALTER TABLE `water` REORGANIZE PARTITION `p_20210104` INTO " "(PARTITION `p_20210102` VALUES LESS THAN (200));", ], @@ -1046,9 +1049,9 @@ def test_get_pending_sql_reorganize_partition_commands_with_changes(self): list(cmds), [ "ALTER TABLE `plushies` REORGANIZE PARTITION `future` INTO " - "(PARTITION `p_20210109` VALUES LESS THAN (550), " - "PARTITION `p_20210116` VALUES LESS THAN (900), " - "PARTITION `p_20210123` VALUES LESS THAN MAXVALUE);" + "(PARTITION `p_20210104` VALUES LESS THAN (550), " + "PARTITION `p_20210111` VALUES LESS THAN (900), " + "PARTITION `p_20210118` VALUES LESS THAN MAXVALUE);" ], )