Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 4, 2024
1 parent 523119f commit fe05fa5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
19 changes: 15 additions & 4 deletions oonipipeline/src/oonipipeline/temporal/activities/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,34 +67,44 @@ def write_observations_to_db(

@dataclass
class MakeObservationsFileEntryBatch:
file_entry_batch: List[FileEntryBatchType]
batch_idx: int
clickhouse: str
write_batch_size: int
data_dir: str
bucket_date: str
probe_cc: List[str]
test_name: List[str]
bucket_date: str
fast_fail: bool


@activity.defn
def make_observations_for_file_entry_batch(
params: MakeObservationsFileEntryBatch,
) -> int:
day = datetime.strptime(params.bucket_date, "%Y-%m-%d").date()
data_dir = pathlib.Path(params.data_dir)

netinfodb = NetinfoDB(datadir=data_dir, download=False)
tbatch = PerfTimer()

tracer = trace.get_tracer(__name__)

file_entry_batches, _ = list_file_entries_batches(
probe_cc=params.probe_cc,
test_name=params.test_name,
start_day=day,
end_day=day + timedelta(days=1),
)

total_failure_count = 0
current_span = trace.get_current_span()
with ClickhouseConnection(
params.clickhouse, write_batch_size=params.write_batch_size
) as db:
ccs = ccs_set(params.probe_cc)
idx = 0
for bucket_name, s3path, ext, fe_size in params.file_entry_batch:
for bucket_name, s3path, ext, fe_size in file_entry_batches:
failure_count = 0
# Nest the traced span within the current span
with tracer.start_span("MakeObservations:stream_file_entry") as span:
Expand Down Expand Up @@ -158,7 +168,8 @@ def make_observations_for_file_entry_batch(


ObservationBatches = TypedDict(
"ObservationBatches", {"batches": List[List[FileEntryBatchType]], "total_size": int}
"ObservationBatches",
{"batch_count": int, "total_size": int},
)


Expand All @@ -174,7 +185,7 @@ def make_observation_batches(params: MakeObservationsParams) -> ObservationBatch
end_day=day + timedelta(days=1),
)
log.info(f"listing {len(file_entry_batches)} batches took {t.pretty}")
return {"batches": file_entry_batches, "total_size": total_size}
return {"batch_count": len(file_entry_batches), "total_size": total_size}


@dataclass
Expand Down
15 changes: 8 additions & 7 deletions oonipipeline/src/oonipipeline/temporal/workflows/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:
await workflow.execute_activity(
optimize_all_tables,
ClickhouseParams(clickhouse_url=params.clickhouse),
start_to_close_timeout=timedelta(minutes=5),
start_to_close_timeout=timedelta(minutes=20),
retry_policy=RetryPolicy(maximum_attempts=10),
)

Expand All @@ -87,22 +87,23 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:
retry_policy=RetryPolicy(maximum_attempts=10),
)

obs_batches = await workflow.execute_activity(
batch_res = await workflow.execute_activity(
make_observation_batches,
params_make_observations,
start_to_close_timeout=timedelta(minutes=30),
retry_policy=RetryPolicy(maximum_attempts=3),
)

coroutine_list = []
for batch in obs_batches["batches"]:
for batch_idx in range(batch_res["batch_count"]):
batch_params = MakeObservationsFileEntryBatch(
file_entry_batch=batch,
batch_idx=batch_idx,
clickhouse=params.clickhouse,
write_batch_size=1_000_000,
data_dir=params.data_dir,
bucket_date=params.bucket_date,
probe_cc=params.probe_cc,
test_name=params.test_name,
fast_fail=params.fast_fail,
)
coroutine_list.append(
Expand All @@ -116,7 +117,7 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:
)
total_msmt_count = sum(await asyncio.gather(*coroutine_list))

mb_per_sec = round(obs_batches["total_size"] / total_t.s / 10**6, 1)
mb_per_sec = round(batch_res["total_size"] / total_t.s / 10**6, 1)
msmt_per_sec = round(total_msmt_count / total_t.s)
workflow.logger.info(
f"finished processing all batches in {total_t.pretty} speed: {mb_per_sec}MB/s ({msmt_per_sec}msmt/s)"
Expand All @@ -125,7 +126,7 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:
await workflow.execute_activity(
optimize_all_tables,
ClickhouseParams(clickhouse_url=params.clickhouse),
start_to_close_timeout=timedelta(minutes=5),
start_to_close_timeout=timedelta(minutes=30),
retry_policy=RetryPolicy(maximum_attempts=10),
)

Expand All @@ -141,7 +142,7 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:

return {
"measurement_count": total_msmt_count,
"size": obs_batches["total_size"],
"size": batch_res["total_size"],
"mb_per_sec": mb_per_sec,
"bucket_date": params.bucket_date,
"msmt_per_sec": msmt_per_sec,
Expand Down

0 comments on commit fe05fa5

Please sign in to comment.