Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor do_partition to reduce the statement count #81

Merged
merged 7 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 54 additions & 54 deletions partitionmanager/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,55 @@ def migrate_cmd(args):
MIGRATE_PARSER.set_defaults(func=migrate_cmd)


def _partition_table(conf, log, table, metrics):
if table_problems := pm_tap.get_table_compatibility_problems(conf.dbcmd, table):
log.error(f"Cannot proceed: {table} {table_problems}")
return None

map_data = pm_tap.get_partition_map(conf.dbcmd, table)

duration = table.partition_period or conf.partition_period

log.info(f"Evaluating {table} (duration={duration})")
cur_pos = partitionmanager.database_helpers.get_position_of_table(
conf.dbcmd, table, map_data
)

sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands(
database=conf.dbcmd,
table=table,
partition_list=map_data["partitions"],
current_position=cur_pos,
allowed_lifespan=duration,
num_empty_partitions=conf.num_empty,
evaluation_time=conf.curtime,
)

if not sql_cmds:
log.debug(f"{table} has no pending SQL updates.")
return None

composite_sql_command = "\n".join(sql_cmds)

if conf.noop:
log.info(f"{table} planned SQL: {composite_sql_command}")
return {"sql": composite_sql_command, "noop": True}

log.info(f"{table} running SQL: {composite_sql_command}")

time_start = datetime.now(tz=timezone.utc)
output = conf.dbcmd.run(composite_sql_command)
time_end = datetime.now(tz=timezone.utc)
metrics.add(
"alter_time_seconds",
table.name,
(time_end - time_start).total_seconds(),
)

log.info(f"{table} results: {output}")
return {"sql": composite_sql_command, "output": output}


def do_partition(conf):
"""Produces SQL statements to manage partitions per the supplied configuration.

Expand Down Expand Up @@ -298,56 +347,15 @@ def do_partition(conf):

all_results = {}
for table in conf.tables:
time_start = None
try:
table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
if table_problems:
log.error(f"Cannot proceed: {table} {table_problems}")
continue

map_data = pm_tap.get_partition_map(conf.dbcmd, table)

duration = conf.partition_period
if table.partition_period:
duration = table.partition_period

log.info(f"Evaluating {table} (duration={duration})")
cur_pos = partitionmanager.database_helpers.get_position_of_table(
conf.dbcmd, table, map_data
)

sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands(
database=conf.dbcmd,
table=table,
partition_list=map_data["partitions"],
current_position=cur_pos,
allowed_lifespan=duration,
num_empty_partitions=conf.num_empty,
evaluation_time=conf.curtime,
)

if not sql_cmds:
log.debug(f"{table} has no pending SQL updates.")
continue

composite_sql_command = "\n".join(sql_cmds)

if conf.noop:
all_results[table.name] = {"sql": composite_sql_command, "noop": True}
log.info(f"{table} planned SQL: {composite_sql_command}")
continue

log.info(f"{table} running SQL: {composite_sql_command}")
time_start = datetime.now(tz=timezone.utc)
output = conf.dbcmd.run(composite_sql_command)

all_results[table.name] = {"sql": composite_sql_command, "output": output}
log.info(f"{table} results: {output}")
if results := _partition_table(conf, log, table, metrics):
all_results[table.name] = results

except partitionmanager.types.NoEmptyPartitionsAvailableException:
log.warning(
f"Unable to automatically handle {table}: No empty "
"partition is available."
"Unable to automatically handle %s: No empty "
"partition is available.",
table,
)
except partitionmanager.types.DatabaseCommandException as e:
log.warning("Failed to automatically handle %s: %s", table, e)
Expand All @@ -358,14 +366,6 @@ def do_partition(conf):
log.warning("Failed to handle %s: %s", table, e)
metrics.add("alter_errors", table.name, 1)

time_end = datetime.now(tz=timezone.utc)
if time_start:
metrics.add(
"alter_time_seconds",
table.name,
(time_end - time_start).total_seconds(),
)

if conf.prometheus_stats_path:
do_stats(conf, metrics=metrics)
return all_results
Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ ignore = [
max-complexity = 16 # default is 10

[tool.ruff.lint.per-file-ignores]
"partitionmanager/cli.py" = ["B008"] # TODO: Fix me
"partitionmanager/cli.py" = ["B008", "PERF203"] # TODO: Fix B008, upgrade to Py3.11 for PERF203
"partitionmanager/cli_test.py" = ["S608", "SIM115", "SIM117"] # TODO: Fix SIMs
"partitionmanager/sql.py" = ["B904", "S603"] # TODO: Fix S603
"partitionmanager/table_append_partition.py" = ["S608", "SIM102"] # TODO: Fix S608
Expand All @@ -119,4 +119,3 @@ max-complexity = 16 # default is 10
[tool.ruff.lint.pylint]
max-args = 7 # default is 5
max-branches = 15 # default is 12
max-statements = 54 # default is 50