Skip to content

Commit

Permalink
Tweak threadpool shutdown logic
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 4, 2024
1 parent 467362e commit 6d16ced
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions oonipipeline/src/oonipipeline/temporal/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,10 @@
]


async def worker_main(temporal_config: TemporalConfig):
async def worker_main(
temporal_config: TemporalConfig, max_workers: int, thread_pool: ThreadPoolExecutor
):
client = await temporal_connect(temporal_config=temporal_config)
max_workers = max(os.cpu_count() or 4, 4)
log.info(f"starting workers with max_workers={max_workers}")
thread_pool = ThreadPoolExecutor(max_workers=max_workers + 2)
async with Worker(
client,
task_queue=TASK_QUEUE_NAME,
Expand All @@ -69,17 +68,26 @@ async def worker_main(temporal_config: TemporalConfig):
log.info("Workers started, ctrl-c to exit")
await interrupt_event.wait()
log.info("Shutting down")
thread_pool.shutdown(wait=True, cancel_futures=True)
log.info("Thread pool stopped")


def start_workers(temporal_config: TemporalConfig):
max_workers = max(os.cpu_count() or 4, 4)
log.info(f"starting workers with max_workers={max_workers}")
thread_pool = ThreadPoolExecutor(max_workers=max_workers + 2)
loop = asyncio.new_event_loop()
# TODO(art): Investigate if we want to upgrade to python 3.12 and use this
# instead
# loop.set_task_factory(asyncio.eager_task_factory)
try:
loop.run_until_complete(worker_main(temporal_config=temporal_config))
loop.run_until_complete(
worker_main(
temporal_config=temporal_config,
max_workers=max_workers,
thread_pool=thread_pool,
)
)
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
thread_pool.shutdown(wait=True, cancel_futures=True)
log.info("shut down thread pool")

0 comments on commit 6d16ced

Please sign in to comment.