diff --git a/docs/configuring.rst b/docs/configuring.rst index 1127121d..a8695b74 100644 --- a/docs/configuring.rst +++ b/docs/configuring.rst @@ -5,12 +5,9 @@ Configuration There are tools to configure some of kopf functionality, like asynchronous tasks behaviour and logging events. -.. note:: - All configuration should be done before importing kopf modules. - Configure logging events -================= +======================== `kopf.config.EventsConfig` allows to set what types of kopf logs should be reflected in events. @@ -23,19 +20,11 @@ Loglevels are: * ``kopf.config.LOGLEVEL_CRITICAL`` .. code-block:: python - :caption: test_example_operator.py - - from kopf import config - - # Now kopf will send events only when error or critical occasion happens - config.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR import kopf - @kopf.on.create('zalando.org', 'v1', 'ephemeralvolumeclaims') - def create_fn(body, **kwargs): - print(f"A handler is called with body: {body}") - + # Now kopf will send events only when error or critical occasion happens + kopf.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR Configure Workers ================= @@ -44,17 +33,9 @@ Configure Workers and timeouts for many kinds of tasks. .. code-block:: python - :caption: test_example_operator.py - - from kopf import config - - # Let's set how many workers can be running simultaneously on per-object event queue - config.WorkersConfig.queue_workers_limit = 10 import kopf - @kopf.on.create('zalando.org', 'v1', 'ephemeralvolumeclaims') - def create_fn(body, **kwargs): - print(f"A handler is called with body: {body}") - + # Let's set how many workers can be running simultaneously on per-object event queue + kopf.WorkersConfig.synchronous_tasks_threadpool_limit = 20 diff --git a/kopf/__init__.py b/kopf/__init__.py index f7c6e219..e35b75b5 100644 --- a/kopf/__init__.py +++ b/kopf/__init__.py @@ -12,6 +12,12 @@ from kopf.config import ( login, configure, + LOGLEVEL_INFO, + LOGLEVEL_WARNING, + LOGLEVEL_ERROR, + LOGLEVEL_CRITICAL, + EventsConfig, + WorkersConfig ) from kopf.events import ( event, diff --git a/kopf/config.py b/kopf/config.py index e921c69b..3666ab02 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -1,6 +1,8 @@ import asyncio +import concurrent.futures import logging +from typing import Optional import click import kubernetes @@ -103,23 +105,37 @@ class WorkersConfig: Used as single point of configuration for kopf.reactor. """ - synchronous_event_post_threadpool_limit = None - """ How many workers can be running simultaneously on event creation operations. """ + threadpool_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None - synchronous_patch_threadpool_limit = None - """ How many workers can be running simultaneously on patch operations. """ - - queue_workers_limit = None # if None, there is no limits to workers number + queue_workers_limit: Optional[int] = None # if None, there is no limits to workers number """ How many workers can be running simultaneously on per-object event queue. """ - synchronous_handlers_threadpool_limit = None # if None, calculated by ThreadPoolExecutor based on cpu count - """ How many threads in total can be running simultaneously to handle non-async handler functions. """ + synchronous_tasks_threadpool_limit: Optional[int] = None # if None, calculated by ThreadPoolExecutor based on cpu count + """ How many threads in total can be running simultaneously to handle any non-async tasks. """ - worker_idle_timeout = 5.0 + worker_idle_timeout: float = 5.0 """ How long does a worker can idle before exiting and garbage-collecting.""" - worker_batch_window = 0.1 + worker_batch_window: float = 0.1 """ How fast/slow does a worker deplete the queue when an event is received.""" - worker_exit_timeout = 2.0 + worker_exit_timeout: float = 2.0 """ How long does a worker can work on watcher exit before being cancelled. """ + + @staticmethod + def get_syn_executor() -> concurrent.futures.ThreadPoolExecutor: + if not WorkersConfig.threadpool_executor: + logging.debug('Setting up syn executor') + WorkersConfig.threadpool_executor = concurrent.futures.ThreadPoolExecutor( + max_workers=WorkersConfig.synchronous_tasks_threadpool_limit + ) + return WorkersConfig.threadpool_executor + + @staticmethod + def set_synchronous_tasks_threadpool_limit(new_limit: int): + if new_limit < 1: + return + + WorkersConfig.synchronous_tasks_threadpool_limit = new_limit + if WorkersConfig.threadpool_executor: + WorkersConfig.threadpool_executor._max_workers = new_limit diff --git a/kopf/k8s/events.py b/kopf/k8s/events.py index 90d8e814..9cc47b18 100644 --- a/kopf/k8s/events.py +++ b/kopf/k8s/events.py @@ -1,5 +1,4 @@ import asyncio -import concurrent.futures import datetime import functools import logging @@ -14,9 +13,6 @@ CUT_MESSAGE_INFIX = '...' -event_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_event_post_threadpool_limit) - - async def post_event(*, obj, type, reason, message=''): """ Issue an event for the object. @@ -69,7 +65,8 @@ async def post_event(*, obj, type, reason, message=''): try: await loop.run_in_executor( - event_executor, functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body}) + config.WorkersConfig.get_syn_executor(), + functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body}) ) except kubernetes.client.rest.ApiException as e: # Events are helpful but auxiliary, they should not fail the handling cycle. diff --git a/kopf/k8s/patching.py b/kopf/k8s/patching.py index 5fdbd59a..2432af3a 100644 --- a/kopf/k8s/patching.py +++ b/kopf/k8s/patching.py @@ -1,13 +1,10 @@ import asyncio -import concurrent.futures import functools import kubernetes from kopf import config -patch_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_patch_threadpool_limit) - async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): """ @@ -41,4 +38,4 @@ async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): patch_func = api.patch_namespaced_custom_object loop = asyncio.get_running_loop() - await loop.run_in_executor(patch_executor, functools.partial(patch_func, **request_kwargs)) + await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), functools.partial(patch_func, **request_kwargs)) diff --git a/kopf/reactor/invocation.py b/kopf/reactor/invocation.py index d2a30535..642039ab 100644 --- a/kopf/reactor/invocation.py +++ b/kopf/reactor/invocation.py @@ -6,18 +6,12 @@ All of this goes via the same invocation logic and protocol. """ import asyncio -import concurrent.futures import contextvars import functools from typing import Callable -# The executor for the sync-handlers (i.e. regular functions). -# TODO: make the limits if sync-handlers configurable? from kopf import config -executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_handlers_threadpool_limit) -# executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) - async def invoke( fn: Callable, @@ -82,9 +76,7 @@ async def invoke( real_fn = functools.partial(context.run, real_fn) loop = asyncio.get_event_loop() - task = loop.run_in_executor(executor, real_fn) - await asyncio.wait([task]) - result = task.result() # re-raises + result = await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), real_fn) return result