diff --git a/partitionmanager/cli_test.py b/partitionmanager/cli_test.py index c0f4f48..bfdae7f 100644 --- a/partitionmanager/cli_test.py +++ b/partitionmanager/cli_test.py @@ -79,8 +79,8 @@ def test_partition_cmd_noop(self): "sql": ( "ALTER TABLE `testtable_noop` REORGANIZE PARTITION " "`p_20201204` INTO " - "(PARTITION `p_20201205` VALUES LESS THAN (548), " - "PARTITION `p_20210104` VALUES LESS THAN MAXVALUE);" + "(PARTITION `p_20201112` VALUES LESS THAN (548), " + "PARTITION `p_20201212` VALUES LESS THAN MAXVALUE);" ), "noop": True, } @@ -101,8 +101,8 @@ def test_partition_cmd_final(self): "sql": ( "ALTER TABLE `testtable_commit` REORGANIZE PARTITION " "`p_20201204` INTO " - "(PARTITION `p_20201205` VALUES LESS THAN (548), " - "PARTITION `p_20210104` VALUES LESS THAN MAXVALUE);" + "(PARTITION `p_20201112` VALUES LESS THAN (548), " + "PARTITION `p_20201212` VALUES LESS THAN MAXVALUE);" ), } }, diff --git a/partitionmanager/table_append_partition.py b/partitionmanager/table_append_partition.py index f2349be..6cf2959 100644 --- a/partitionmanager/table_append_partition.py +++ b/partitionmanager/table_append_partition.py @@ -308,7 +308,8 @@ def _predict_forward_time(current_position, end_position, rates, evaluation_time if max(days_remaining) < 0: raise ValueError(f"All values are negative: {days_remaining}") - return evaluation_time + (max(days_remaining) * timedelta(days=1)) + calculated = evaluation_time + (max(days_remaining) * timedelta(days=1)) + return calculated.replace(minute=0, second=0, microsecond=0) def _calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan): @@ -323,7 +324,7 @@ def _calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan): if partition_start_time < evaluation_time: # Partition start times should never be in the past. return evaluation_time - return partition_start_time + return partition_start_time.replace(minute=0, second=0, microsecond=0) def _plan_partition_changes( @@ -378,10 +379,13 @@ def _plan_partition_changes( # to exclude the future-dated, irrelevant partition. log.debug( f"Misprediction: Evaluation time ({evaluation_time}) is " - f"before the active partition {active_partition}. Excluding from " - "rate calculations." + f"before the active partition {active_partition}. Excluding " + "mispredicted partitions from the rate calculations." ) - rate_relevant_partitions = filled_partitions + [ + filled_partitions = filter( + lambda f: f.timestamp() < evaluation_time, filled_partitions + ) + rate_relevant_partitions = list(filled_partitions) + [ partitionmanager.types.InstantPartition(evaluation_time, current_position) ] @@ -403,16 +407,16 @@ def _plan_partition_changes( changed_partition = partitionmanager.types.ChangePlannedPartition(partition) + start_of_fill_time = _predict_forward_time( + current_position, last_changed.position, rates, evaluation_time + ) + if isinstance(partition, partitionmanager.types.PositionPartition): # We can't change the position on this partition, but we can adjust # the name to be more exact as to what date we expect it to begin # filling. If we calculate the start-of-fill date and it doesn't # match the partition's name, let's rename it and mark it as an # important change. - start_of_fill_time = _predict_forward_time( - current_position, last_changed.position, rates, evaluation_time - ) - if start_of_fill_time.date() != partition.timestamp().date(): log.info( f"Start-of-fill predicted at {start_of_fill_time.date()} " @@ -427,15 +431,21 @@ def _plan_partition_changes( # we calculate forward what position we expect and use it in the # future. - partition_start_time = _calculate_start_time( + nominal_partition_start_time = _calculate_start_time( last_changed.timestamp(), evaluation_time, allowed_lifespan ) + + # We use the nearest timestamp, which should generally be the + # calculated time, but could be the fill time based on predicting + # forward if we have gotten far off in our predictions in the past. + changed_partition.set_timestamp( + min(nominal_partition_start_time, start_of_fill_time) + ) + changed_part_pos = _predict_forward_position( last_changed.position.as_list(), rates, allowed_lifespan ) - changed_partition.set_position(changed_part_pos).set_timestamp( - partition_start_time - ) + changed_partition.set_position(changed_part_pos) results.append(changed_partition) @@ -455,6 +465,27 @@ def _plan_partition_changes( .set_timestamp(partition_start_time) ) + # Confirm we won't make timestamp conflicts + existing_timestamps = list(map(lambda p: p.timestamp(), partition_list)) + conflict_found = True + while conflict_found: + conflict_found = False + for partition in results: + if partition.timestamp() in existing_timestamps: + if ( + isinstance(partition, partitionmanager.types.ChangePlannedPartition) + and partition.timestamp() == partition.old.timestamp() + ): + # That's not a conflict + continue + + log.debug( + f"{partition} has a conflict for its timestamp, increasing by 1 day." + ) + partition.set_timestamp(partition.timestamp() + timedelta(days=1)) + conflict_found = True + break + # Final result is always MAXVALUE results[-1].set_as_max_value() diff --git a/partitionmanager/table_append_partition_test.py b/partitionmanager/table_append_partition_test.py index 822eefd..8fdb2c0 100644 --- a/partitionmanager/table_append_partition_test.py +++ b/partitionmanager/table_append_partition_test.py @@ -565,16 +565,16 @@ def test_plan_partition_changes_wildly_off_dates(self): ) self.assertEqual( - planned, [ ChangePlannedPartition(mkPPart("p_20201231", 100)), ChangePlannedPartition(mkPPart("p_20210104", 200)) .set_timestamp(datetime(2021, 1, 2, tzinfo=timezone.utc)) .set_important(), ChangePlannedPartition(mkTailPart("future")).set_timestamp( - datetime(2021, 1, 9, tzinfo=timezone.utc) + datetime(2021, 1, 5, tzinfo=timezone.utc) ), ], + planned, ) def test_plan_partition_changes_long_delay(self): @@ -604,6 +604,7 @@ def test_plan_partition_changes_long_delay(self): ) def test_plan_partition_changes_short_names(self): + self.maxDiff = None planned = _plan_partition_changes( [ mkPPart("p_2019", 1912499867), @@ -679,6 +680,7 @@ def test_plan_partition_changes_bespoke_names(self): ) def test_plan_partition_changes(self): + self.maxDiff = None planned = _plan_partition_changes( [ mkPPart("p_20201231", 100), @@ -697,7 +699,7 @@ def test_plan_partition_changes(self): ChangePlannedPartition(mkPPart("p_20201231", 100)), ChangePlannedPartition(mkPPart("p_20210102", 200)), ChangePlannedPartition(mkTailPart("future")).set_timestamp( - datetime(2021, 1, 9, tzinfo=timezone.utc) + datetime(2021, 1, 4, tzinfo=timezone.utc) ), ], ) @@ -718,19 +720,20 @@ def test_plan_partition_changes(self): ChangePlannedPartition(mkPPart("p_20210102", 200)).set_position([200]), ChangePlannedPartition(mkTailPart("future")) .set_position([320]) - .set_timestamp(datetime(2021, 1, 9, tzinfo=timezone.utc)), + .set_timestamp(datetime(2021, 1, 3, tzinfo=timezone.utc)), NewPlannedPartition() .set_position([440]) - .set_timestamp(datetime(2021, 1, 16, tzinfo=timezone.utc)), + .set_timestamp(datetime(2021, 1, 10, tzinfo=timezone.utc)), NewPlannedPartition() .set_columns(1) - .set_timestamp(datetime(2021, 1, 23, tzinfo=timezone.utc)), + .set_timestamp(datetime(2021, 1, 17, tzinfo=timezone.utc)), ], ) def test_plan_partition_changes_misprediction(self): """ We have to handle the case where the partition list doesn't cleanly match reality. """ + self.maxDiff = None planned = _plan_partition_changes( [ mkPPart("p_20210505", 9505010028), @@ -748,15 +751,15 @@ def test_plan_partition_changes_misprediction(self): planned, [ ChangePlannedPartition(mkPPart("p_20210704", 10799505006)), - ChangePlannedPartition(mkTailPart("p_20210803")).set_position( - [11578057459] - ), + ChangePlannedPartition(mkTailPart("p_20210803")) + .set_position([11578057459]) + .set_timestamp(datetime(2021, 6, 28, tzinfo=timezone.utc)), NewPlannedPartition() .set_position([12356609912]) - .set_timestamp(datetime(2021, 9, 2, tzinfo=timezone.utc)), + .set_timestamp(datetime(2021, 7, 28, tzinfo=timezone.utc)), NewPlannedPartition() .set_columns(1) - .set_timestamp(datetime(2021, 10, 2, tzinfo=timezone.utc)), + .set_timestamp(datetime(2021, 8, 27, tzinfo=timezone.utc)), ], ) @@ -984,7 +987,7 @@ def test_plan_andgenerate_sql_reorganize_partition_commands_with_future_partitio list(generate_sql_reorganize_partition_commands(Table("water"), planned)), [ "ALTER TABLE `water` REORGANIZE PARTITION `future` INTO " - "(PARTITION `p_20210109` VALUES LESS THAN MAXVALUE);", + "(PARTITION `p_20210105` VALUES LESS THAN MAXVALUE);", "ALTER TABLE `water` REORGANIZE PARTITION `p_20210104` INTO " "(PARTITION `p_20210102` VALUES LESS THAN (200));", ], @@ -1046,9 +1049,9 @@ def test_get_pending_sql_reorganize_partition_commands_with_changes(self): list(cmds), [ "ALTER TABLE `plushies` REORGANIZE PARTITION `future` INTO " - "(PARTITION `p_20210109` VALUES LESS THAN (550), " - "PARTITION `p_20210116` VALUES LESS THAN (900), " - "PARTITION `p_20210123` VALUES LESS THAN MAXVALUE);" + "(PARTITION `p_20210104` VALUES LESS THAN (550), " + "PARTITION `p_20210111` VALUES LESS THAN (900), " + "PARTITION `p_20210118` VALUES LESS THAN MAXVALUE);" ], )