Skip to content

Commit

Permalink
Change the passing of the executor pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 4, 2024
1 parent 8297a0f commit df710c3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
27 changes: 15 additions & 12 deletions oonipipeline/src/oonipipeline/temporal/activities/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import concurrent.futures
from dataclasses import dataclass
import functools
from typing import Any, Dict, List, Sequence, Tuple, TypedDict
from typing import Any, Dict, List, Optional, Sequence, Tuple, TypedDict
from oonidata.dataclient import (
ccs_set,
list_file_entries_batches,
Expand Down Expand Up @@ -121,6 +121,7 @@ async def make_observations_for_file_entry_batch(
data_dir: pathlib.Path,
clickhouse: str,
write_batch_size: int,
executor: Optional[concurrent.futures.Executor] = None,
fast_fail: bool = False,
) -> int:
loop = asyncio.get_running_loop()
Expand All @@ -135,7 +136,7 @@ async def make_observations_for_file_entry_batch(
log.debug(f"processing file s3://{bucket_name}/{s3path}")
awaitables.append(
loop.run_in_executor(
None,
executor,
functools.partial(
make_observations_for_file_entry,
clickhouse=clickhouse,
Expand Down Expand Up @@ -212,16 +213,18 @@ async def make_observations(params: MakeObservationsParams) -> MakeObservationsR
),
)
measurement_count = 0
for file_entry_batch in batches["batches"]:
measurement_count += await make_observations_for_file_entry_batch(
file_entry_batch=file_entry_batch,
bucket_date=params.bucket_date,
probe_cc=params.probe_cc,
data_dir=pathlib.Path(params.data_dir),
clickhouse=params.clickhouse,
write_batch_size=1_000_000,
fast_fail=False,
)
with concurrent.futures.ProcessPoolExecutor() as process_pool:
for file_entry_batch in batches["batches"]:
measurement_count += await make_observations_for_file_entry_batch(
file_entry_batch=file_entry_batch,
bucket_date=params.bucket_date,
probe_cc=params.probe_cc,
data_dir=pathlib.Path(params.data_dir),
clickhouse=params.clickhouse,
write_batch_size=1_000_000,
fast_fail=False,
executor=process_pool,
)

current_span.set_attribute("total_runtime_ms", tbatch.ms)
# current_span.set_attribute("total_failure_count", total_failure_count)
Expand Down
2 changes: 1 addition & 1 deletion oonipipeline/src/oonipipeline/temporal/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def worker_main(
def start_workers(temporal_config: TemporalConfig):
max_workers = max(os.cpu_count() or 4, 4)
log.info(f"starting workers with max_workers={max_workers}")
executor = ProcessPoolExecutor(max_workers=max_workers + 2)
executor = ThreadPoolExecutor(max_workers=max_workers + 2)

loop = asyncio.new_event_loop()
loop.set_default_executor(executor)
Expand Down

0 comments on commit df710c3

Please sign in to comment.