Skip to content

Commit

Permalink
Handle mispredictions cleanly
Browse files Browse the repository at this point in the history
In the real world, we might mess up when naming a partition. This should be
rare if partitionmanager is running often, since it'll rename partitions
to match reality, but when it's running only rarely, things get out of date.

This change avoids attempting to calculate rates-of-change using partitions
that don't make sense - e.g., today is July 1, and our active partition
says it starts in a week. That is plainly wrong, but we can still use our
current rate-of-change.

This expands on PR #12 by changing what the start-datetime is for new
partitions after we mispredicted - without this change, if we had partitions
through to December, but it's only August and we need more, the new partitions
would be named for January instead of reflecting reality that they need to
be named for Right Now.

This also catches a bug where we could get timestamp name collisions. This is
a lot less of an issue when I implement Tim's suggestion in #19, but for now
this just increases dates by a day to avoid a collision, and that works well.
  • Loading branch information
jcjones committed Sep 20, 2021
1 parent 51e10be commit 719a4c2
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 32 deletions.
8 changes: 4 additions & 4 deletions partitionmanager/cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def test_partition_cmd_noop(self):
"sql": (
"ALTER TABLE `testtable_noop` REORGANIZE PARTITION "
"`p_20201204` INTO "
"(PARTITION `p_20201205` VALUES LESS THAN (548), "
"PARTITION `p_20210104` VALUES LESS THAN MAXVALUE);"
"(PARTITION `p_20201112` VALUES LESS THAN (548), "
"PARTITION `p_20201212` VALUES LESS THAN MAXVALUE);"
),
"noop": True,
}
Expand All @@ -101,8 +101,8 @@ def test_partition_cmd_final(self):
"sql": (
"ALTER TABLE `testtable_commit` REORGANIZE PARTITION "
"`p_20201204` INTO "
"(PARTITION `p_20201205` VALUES LESS THAN (548), "
"PARTITION `p_20210104` VALUES LESS THAN MAXVALUE);"
"(PARTITION `p_20201112` VALUES LESS THAN (548), "
"PARTITION `p_20201212` VALUES LESS THAN MAXVALUE);"
),
}
},
Expand Down
57 changes: 44 additions & 13 deletions partitionmanager/table_append_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ def _predict_forward_time(current_position, end_position, rates, evaluation_time

if max(days_remaining) < 0:
raise ValueError(f"All values are negative: {days_remaining}")
return evaluation_time + (max(days_remaining) * timedelta(days=1))
calculated = evaluation_time + (max(days_remaining) * timedelta(days=1))
return calculated.replace(minute=0, second=0, microsecond=0)


def _calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan):
Expand All @@ -323,7 +324,7 @@ def _calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan):
if partition_start_time < evaluation_time:
# Partition start times should never be in the past.
return evaluation_time
return partition_start_time
return partition_start_time.replace(minute=0, second=0, microsecond=0)


def _plan_partition_changes(
Expand Down Expand Up @@ -378,10 +379,13 @@ def _plan_partition_changes(
# to exclude the future-dated, irrelevant partition.
log.debug(
f"Misprediction: Evaluation time ({evaluation_time}) is "
f"before the active partition {active_partition}. Excluding from "
"rate calculations."
f"before the active partition {active_partition}. Excluding "
"mispredicted partitions from the rate calculations."
)
rate_relevant_partitions = filled_partitions + [
filled_partitions = filter(
lambda f: f.timestamp() < evaluation_time, filled_partitions
)
rate_relevant_partitions = list(filled_partitions) + [
partitionmanager.types.InstantPartition(evaluation_time, current_position)
]

Expand All @@ -403,16 +407,16 @@ def _plan_partition_changes(

changed_partition = partitionmanager.types.ChangePlannedPartition(partition)

start_of_fill_time = _predict_forward_time(
current_position, last_changed.position, rates, evaluation_time
)

if isinstance(partition, partitionmanager.types.PositionPartition):
# We can't change the position on this partition, but we can adjust
# the name to be more exact as to what date we expect it to begin
# filling. If we calculate the start-of-fill date and it doesn't
# match the partition's name, let's rename it and mark it as an
# important change.
start_of_fill_time = _predict_forward_time(
current_position, last_changed.position, rates, evaluation_time
)

if start_of_fill_time.date() != partition.timestamp().date():
log.info(
f"Start-of-fill predicted at {start_of_fill_time.date()} "
Expand All @@ -427,15 +431,21 @@ def _plan_partition_changes(
# we calculate forward what position we expect and use it in the
# future.

partition_start_time = _calculate_start_time(
nominal_partition_start_time = _calculate_start_time(
last_changed.timestamp(), evaluation_time, allowed_lifespan
)

# We use the nearest timestamp, which should generally be the
# calculated time, but could be the fill time based on predicting
# forward if we have gotten far off in our predictions in the past.
changed_partition.set_timestamp(
min(nominal_partition_start_time, start_of_fill_time)
)

changed_part_pos = _predict_forward_position(
last_changed.position.as_list(), rates, allowed_lifespan
)
changed_partition.set_position(changed_part_pos).set_timestamp(
partition_start_time
)
changed_partition.set_position(changed_part_pos)

results.append(changed_partition)

Expand All @@ -455,6 +465,27 @@ def _plan_partition_changes(
.set_timestamp(partition_start_time)
)

# Confirm we won't make timestamp conflicts
existing_timestamps = list(map(lambda p: p.timestamp(), partition_list))
conflict_found = True
while conflict_found:
conflict_found = False
for partition in results:
if partition.timestamp() in existing_timestamps:
if (
isinstance(partition, partitionmanager.types.ChangePlannedPartition)
and partition.timestamp() == partition.old.timestamp()
):
# That's not a conflict
continue

log.debug(
f"{partition} has a conflict for its timestamp, increasing by 1 day."
)
partition.set_timestamp(partition.timestamp() + timedelta(days=1))
conflict_found = True
break

# Final result is always MAXVALUE
results[-1].set_as_max_value()

Expand Down
33 changes: 18 additions & 15 deletions partitionmanager/table_append_partition_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,16 +565,16 @@ def test_plan_partition_changes_wildly_off_dates(self):
)

self.assertEqual(
planned,
[
ChangePlannedPartition(mkPPart("p_20201231", 100)),
ChangePlannedPartition(mkPPart("p_20210104", 200))
.set_timestamp(datetime(2021, 1, 2, tzinfo=timezone.utc))
.set_important(),
ChangePlannedPartition(mkTailPart("future")).set_timestamp(
datetime(2021, 1, 9, tzinfo=timezone.utc)
datetime(2021, 1, 5, tzinfo=timezone.utc)
),
],
planned,
)

def test_plan_partition_changes_long_delay(self):
Expand Down Expand Up @@ -604,6 +604,7 @@ def test_plan_partition_changes_long_delay(self):
)

def test_plan_partition_changes_short_names(self):
self.maxDiff = None
planned = _plan_partition_changes(
[
mkPPart("p_2019", 1912499867),
Expand Down Expand Up @@ -679,6 +680,7 @@ def test_plan_partition_changes_bespoke_names(self):
)

def test_plan_partition_changes(self):
self.maxDiff = None
planned = _plan_partition_changes(
[
mkPPart("p_20201231", 100),
Expand All @@ -697,7 +699,7 @@ def test_plan_partition_changes(self):
ChangePlannedPartition(mkPPart("p_20201231", 100)),
ChangePlannedPartition(mkPPart("p_20210102", 200)),
ChangePlannedPartition(mkTailPart("future")).set_timestamp(
datetime(2021, 1, 9, tzinfo=timezone.utc)
datetime(2021, 1, 4, tzinfo=timezone.utc)
),
],
)
Expand All @@ -718,19 +720,20 @@ def test_plan_partition_changes(self):
ChangePlannedPartition(mkPPart("p_20210102", 200)).set_position([200]),
ChangePlannedPartition(mkTailPart("future"))
.set_position([320])
.set_timestamp(datetime(2021, 1, 9, tzinfo=timezone.utc)),
.set_timestamp(datetime(2021, 1, 3, tzinfo=timezone.utc)),
NewPlannedPartition()
.set_position([440])
.set_timestamp(datetime(2021, 1, 16, tzinfo=timezone.utc)),
.set_timestamp(datetime(2021, 1, 10, tzinfo=timezone.utc)),
NewPlannedPartition()
.set_columns(1)
.set_timestamp(datetime(2021, 1, 23, tzinfo=timezone.utc)),
.set_timestamp(datetime(2021, 1, 17, tzinfo=timezone.utc)),
],
)

def test_plan_partition_changes_misprediction(self):
""" We have to handle the case where the partition list doesn't cleanly
match reality. """
self.maxDiff = None
planned = _plan_partition_changes(
[
mkPPart("p_20210505", 9505010028),
Expand All @@ -748,15 +751,15 @@ def test_plan_partition_changes_misprediction(self):
planned,
[
ChangePlannedPartition(mkPPart("p_20210704", 10799505006)),
ChangePlannedPartition(mkTailPart("p_20210803")).set_position(
[11578057459]
),
ChangePlannedPartition(mkTailPart("p_20210803"))
.set_position([11578057459])
.set_timestamp(datetime(2021, 6, 28, tzinfo=timezone.utc)),
NewPlannedPartition()
.set_position([12356609912])
.set_timestamp(datetime(2021, 9, 2, tzinfo=timezone.utc)),
.set_timestamp(datetime(2021, 7, 28, tzinfo=timezone.utc)),
NewPlannedPartition()
.set_columns(1)
.set_timestamp(datetime(2021, 10, 2, tzinfo=timezone.utc)),
.set_timestamp(datetime(2021, 8, 27, tzinfo=timezone.utc)),
],
)

Expand Down Expand Up @@ -984,7 +987,7 @@ def test_plan_andgenerate_sql_reorganize_partition_commands_with_future_partitio
list(generate_sql_reorganize_partition_commands(Table("water"), planned)),
[
"ALTER TABLE `water` REORGANIZE PARTITION `future` INTO "
"(PARTITION `p_20210109` VALUES LESS THAN MAXVALUE);",
"(PARTITION `p_20210105` VALUES LESS THAN MAXVALUE);",
"ALTER TABLE `water` REORGANIZE PARTITION `p_20210104` INTO "
"(PARTITION `p_20210102` VALUES LESS THAN (200));",
],
Expand Down Expand Up @@ -1046,9 +1049,9 @@ def test_get_pending_sql_reorganize_partition_commands_with_changes(self):
list(cmds),
[
"ALTER TABLE `plushies` REORGANIZE PARTITION `future` INTO "
"(PARTITION `p_20210109` VALUES LESS THAN (550), "
"PARTITION `p_20210116` VALUES LESS THAN (900), "
"PARTITION `p_20210123` VALUES LESS THAN MAXVALUE);"
"(PARTITION `p_20210104` VALUES LESS THAN (550), "
"PARTITION `p_20210111` VALUES LESS THAN (900), "
"PARTITION `p_20210118` VALUES LESS THAN MAXVALUE);"
],
)

Expand Down

0 comments on commit 719a4c2

Please sign in to comment.