Skip to content

Commit

Permalink
Add custom prefix option
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 10, 2024
1 parent c1ca448 commit e45f4ff
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 6 deletions.
10 changes: 10 additions & 0 deletions oonipipeline/src/oonipipeline/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def _parse_csv(ctx, param, s: Optional[str]) -> List[str]:
""",
)
start_workers_option = click.option("--start-workers/--no-start-workers", default=True)
custom_prefix_option = click.option("--custom-prefix", type=str, default="")

def maybe_create_delete_tables(
clickhouse_url: str,
Expand Down Expand Up @@ -117,6 +118,7 @@ def cli(log_level: int):
@end_at_option
@probe_cc_option
@test_name_option
@custom_prefix_option
@click.option("--workflow-name", type=str, required=True)
@click.option(
"--create-tables",
Expand All @@ -132,6 +134,7 @@ def backfill(
probe_cc: List[str],
test_name: List[str],
workflow_name: str,
custom_prefix: str,
start_at: datetime,
end_at: datetime,
create_tables: bool,
Expand Down Expand Up @@ -164,15 +167,18 @@ def backfill(
test_name=test_name,
start_at=start_at,
end_at=end_at,
custom_prefix=custom_prefix,
)


@cli.command()
@probe_cc_option
@test_name_option
@custom_prefix_option
def schedule(
probe_cc: List[str],
test_name: List[str],
custom_prefix: str,
):
"""
Create schedules for the specified parameters
Expand All @@ -190,15 +196,18 @@ def schedule(
clickhouse_url=config.clickhouse_url,
data_dir=config.data_dir,
temporal_config=temporal_config,
custom_prefix=custom_prefix,
)


@cli.command()
@probe_cc_option
@test_name_option
@custom_prefix_option
def clear_schedules(
probe_cc: List[str],
test_name: List[str],
custom_prefix: str,
):
"""
Create schedules for the specified parameters
Expand All @@ -214,6 +223,7 @@ def clear_schedules(
probe_cc=probe_cc,
test_name=test_name,
temporal_config=temporal_config,
custom_prefix=custom_prefix,
)


Expand Down
12 changes: 12 additions & 0 deletions oonipipeline/src/oonipipeline/temporal/client_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ async def execute_backfill(
end_at: datetime,
workflow_name: str,
temporal_config: TemporalConfig,
custom_prefix: str,
):
log.info(f"creating all schedules")

Expand All @@ -131,6 +132,7 @@ async def execute_backfill(
start_at=start_at,
end_at=end_at,
workflow_name=workflow_name,
custom_prefix=custom_prefix,
)


Expand All @@ -140,6 +142,7 @@ async def create_schedules(
clickhouse_url: str,
data_dir: str,
temporal_config: TemporalConfig,
custom_prefix: str,
) -> ScheduleIdMap:
log.info(f"creating all schedules")

Expand All @@ -151,13 +154,15 @@ async def create_schedules(
test_name=test_name,
clickhouse_url=clickhouse_url,
data_dir=data_dir,
custom_prefix=custom_prefix,
)


async def execute_clear_schedules(
probe_cc: List[str],
test_name: List[str],
temporal_config: TemporalConfig,
custom_prefix: str,
) -> List[str]:
log.info(f"rescheduling everything")

Expand All @@ -167,6 +172,7 @@ async def execute_clear_schedules(
client=client,
probe_cc=probe_cc,
test_name=test_name,
custom_prefix=custom_prefix,
)


Expand Down Expand Up @@ -214,6 +220,7 @@ def run_backfill(
workflow_name: str,
start_at: datetime,
end_at: datetime,
custom_prefix: str = "",
):
try:
asyncio.run(
Expand All @@ -224,6 +231,7 @@ def run_backfill(
test_name=test_name,
start_at=start_at,
end_at=end_at,
custom_prefix=custom_prefix,
)
)
except KeyboardInterrupt:
Expand All @@ -236,6 +244,7 @@ def run_create_schedules(
clickhouse_url: str,
data_dir: str,
temporal_config: TemporalConfig,
custom_prefix: str = "",
):
try:
asyncio.run(
Expand All @@ -245,6 +254,7 @@ def run_create_schedules(
clickhouse_url=clickhouse_url,
data_dir=data_dir,
temporal_config=temporal_config,
custom_prefix=custom_prefix,
)
)
except KeyboardInterrupt:
Expand All @@ -255,13 +265,15 @@ def run_clear_schedules(
probe_cc: List[str],
test_name: List[str],
temporal_config: TemporalConfig,
custom_prefix: str = "",
):
try:
asyncio.run(
execute_clear_schedules(
probe_cc=probe_cc,
test_name=test_name,
temporal_config=temporal_config,
custom_prefix=custom_prefix,
)
)
except KeyboardInterrupt:
Expand Down
27 changes: 21 additions & 6 deletions oonipipeline/src/oonipipeline/temporal/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ async def list_existing_schedules(
client: TemporalClient,
probe_cc: List[str],
test_name: List[str],
custom_prefix: str = "",
):
schedule_id_map_list = ScheduleIdMapList(
observations=[],
Expand All @@ -71,7 +72,9 @@ async def list_existing_schedules(

schedule_list = await client.list_schedules()
async for sched in schedule_list:
if sched.id.startswith(f"{OBSERVATIONS_SCHED_PREFIX}-{filter_id}"):
if sched.id.startswith(
f"{custom_prefix}{OBSERVATIONS_SCHED_PREFIX}-{filter_id}"
):
schedule_id_map_list.observations.append(sched.id)
elif sched.id.startswith(f"{ANALYSIS_SCHED_PREFIX}-{filter_id}"):
schedule_id_map_list.analysis.append(sched.id)
Expand All @@ -85,6 +88,7 @@ async def schedule_all(
test_name: List[str],
clickhouse_url: str,
data_dir: str,
custom_prefix: str = "",
) -> ScheduleIdMap:
schedule_id_map = ScheduleIdMap()
filter_id = gen_schedule_filter_id(probe_cc, test_name)
Expand All @@ -96,7 +100,10 @@ async def schedule_all(
ts = datetime.now(timezone.utc).strftime("%y.%m.%d_%H%M%S")

existing_schedules = await list_existing_schedules(
client=client, probe_cc=probe_cc, test_name=test_name
client=client,
probe_cc=probe_cc,
test_name=test_name,
custom_prefix=custom_prefix,
)

assert len(existing_schedules.observations) == 0
Expand All @@ -115,7 +122,7 @@ async def schedule_all(
action=ScheduleActionStartWorkflow(
ObservationsWorkflow.run,
obs_params,
id=f"{OBSERVATIONS_WF_PREFIX}-{filter_id}-{ts}",
id=f"{custom_prefix}{OBSERVATIONS_WF_PREFIX}-{filter_id}-{ts}",
task_queue=TASK_QUEUE_NAME,
execution_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT,
task_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT,
Expand Down Expand Up @@ -149,7 +156,7 @@ async def schedule_all(
action=ScheduleActionStartWorkflow(
AnalysisWorkflow.run,
analysis_params,
id=f"{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}",
id=f"{custom_prefix}{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}",
task_queue=TASK_QUEUE_NAME,
execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
Expand Down Expand Up @@ -183,10 +190,14 @@ async def clear_schedules(
client: TemporalClient,
probe_cc: List[str],
test_name: List[str],
custom_prefix: str = "",
) -> List[str]:
schedule_ids = []
existing_schedules = await list_existing_schedules(
client=client, probe_cc=probe_cc, test_name=test_name
client=client,
probe_cc=probe_cc,
test_name=test_name,
custom_prefix=custom_prefix,
)
for sid in existing_schedules.observations + existing_schedules.analysis:
log.info(f"deleting schedule {sid}")
Expand All @@ -202,9 +213,13 @@ async def schedule_backfill(
end_at: datetime,
probe_cc: List[str],
test_name: List[str],
custom_prefix: str = "",
):
existing_schedules = await list_existing_schedules(
client=client, probe_cc=probe_cc, test_name=test_name
client=client,
probe_cc=probe_cc,
test_name=test_name,
custom_prefix=custom_prefix,
)
if workflow_name == "observations":
assert (
Expand Down

0 comments on commit e45f4ff

Please sign in to comment.