diff --git a/docs/configuring.rst b/docs/configuring.rst new file mode 100644 index 00000000..a8695b74 --- /dev/null +++ b/docs/configuring.rst @@ -0,0 +1,41 @@ +================ +Configuration +================ + +There are tools to configure some of kopf functionality, like asynchronous +tasks behaviour and logging events. + + +Configure logging events +======================== + +`kopf.config.EventsConfig` allows to set what types of kopf logs should be +reflected in events. + +Loglevels are: + +* ``kopf.config.LOGLEVEL_INFO`` +* ``kopf.config.LOGLEVEL_WARNING`` +* ``kopf.config.LOGLEVEL_ERROR`` +* ``kopf.config.LOGLEVEL_CRITICAL`` + +.. code-block:: python + + import kopf + + # Now kopf will send events only when error or critical occasion happens + kopf.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR + +Configure Workers +================= + +`kopf.config.WorkersConfig` allows to set numbers of workers, launch periods, +and timeouts for many kinds of tasks. + +.. code-block:: python + + import kopf + + # Let's set how many workers can be running simultaneously on per-object event queue + kopf.WorkersConfig.synchronous_tasks_threadpool_limit = 20 + diff --git a/docs/index.rst b/docs/index.rst index 90c34be5..cc8edab6 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -36,6 +36,7 @@ Kopf: Kubernetes Operators Framework errors events testing + configuring .. toctree:: :maxdepth: 2 diff --git a/examples/04-events/example.py b/examples/04-events/example.py index f36c33e2..f4666ed6 100644 --- a/examples/04-events/example.py +++ b/examples/04-events/example.py @@ -13,6 +13,7 @@ def create_fn(body, **kwargs): # The shortcuts for the conventional events and common cases. kopf.info(body, reason="SomeReason", message="Some message") kopf.warn(body, reason="SomeReason", message="Some message") + try: raise RuntimeError("Exception text.") except: diff --git a/examples/99-all-at-once/example.py b/examples/99-all-at-once/example.py index 531f703c..384d49f4 100644 --- a/examples/99-all-at-once/example.py +++ b/examples/99-all-at-once/example.py @@ -1,7 +1,6 @@ """ Kubernetes operator example: all the features at once (for debugging & testing). """ - import pprint import time diff --git a/kopf/__init__.py b/kopf/__init__.py index f7c6e219..e35b75b5 100644 --- a/kopf/__init__.py +++ b/kopf/__init__.py @@ -12,6 +12,12 @@ from kopf.config import ( login, configure, + LOGLEVEL_INFO, + LOGLEVEL_WARNING, + LOGLEVEL_ERROR, + LOGLEVEL_CRITICAL, + EventsConfig, + WorkersConfig ) from kopf.events import ( event, diff --git a/kopf/cli.py b/kopf/cli.py index 3e9b996b..2e16b377 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -1,3 +1,4 @@ +import asyncio import functools import click @@ -70,7 +71,8 @@ def freeze(id, message, lifetime, namespace, peering_name, priority): priority=priority, lifetime=lifetime, ) - ourserlves.keepalive() + loop = asyncio.get_event_loop() + loop.run_until_complete(ourserlves.keepalive()) @main.command() @@ -86,4 +88,5 @@ def resume(id, namespace, peering_name): name=peering_name, namespace=namespace, ) - ourselves.disappear() + loop = asyncio.get_event_loop() + loop.run_until_complete(ourselves.disappear()) diff --git a/kopf/config.py b/kopf/config.py index 451817fc..bebe2ca6 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -1,6 +1,8 @@ import asyncio +import concurrent.futures import logging +from typing import Optional import click import kubernetes @@ -11,6 +13,16 @@ format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' +LOGLEVEL_INFO = 20 +""" Event loglevel to log all events. """ +LOGLEVEL_WARNING = 30 +""" Event loglevel to log all events except informational. """ +LOGLEVEL_ERROR = 40 +""" Event loglevel to log only errors and critical events. """ +LOGLEVEL_CRITICAL = 50 +""" Event loglevel to log only critical events(basically - no events). """ + + class LoginError(click.ClickException): """ Raised when the operator cannot login to the API. """ @@ -77,3 +89,56 @@ def configure(debug=None, verbose=None, quiet=None): loop = asyncio.get_event_loop() loop.set_debug(debug) + + +class EventsConfig: + """ + Used to configure events sending behaviour. + """ + + events_loglevel = LOGLEVEL_INFO + """ What events should be logged. """ + + +class WorkersConfig: + """ + Used as single point of configuration for kopf.reactor. + """ + + threadpool_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None + + queue_workers_limit: Optional[int] = None # if None, there is no limits to workers number + """ How many workers can be running simultaneously on per-object event queue. """ + + synchronous_tasks_threadpool_limit: Optional[int] = None # if None, calculated by ThreadPoolExecutor based on cpu count + """ How many threads in total can be running simultaneously to handle any non-async tasks.""" + + worker_idle_timeout: float = 5.0 + """ How long does a worker can idle before exiting and garbage-collecting.""" + + worker_batch_window: float = 0.1 + """ How fast/slow does a worker deplete the queue when an event is received.""" + + worker_exit_timeout: float = 2.0 + """ How long does a worker can work on watcher exit before being cancelled. """ + + @staticmethod + def get_syn_executor() -> concurrent.futures.ThreadPoolExecutor: + if not WorkersConfig.threadpool_executor: + logging.debug('Setting up syn executor') + WorkersConfig.threadpool_executor = concurrent.futures.ThreadPoolExecutor( + max_workers=WorkersConfig.synchronous_tasks_threadpool_limit + ) + return WorkersConfig.threadpool_executor + + @staticmethod + def set_synchronous_tasks_threadpool_limit(new_limit: int): + """ + Call this static method at any time to change synchronous_tasks_threadpool_limit in runtime. + """ + if new_limit < 1: + raise ValueError('Can`t set threadpool limit lower than 1') + + WorkersConfig.synchronous_tasks_threadpool_limit = new_limit + if WorkersConfig.threadpool_executor: + WorkersConfig.threadpool_executor._max_workers = new_limit diff --git a/kopf/events.py b/kopf/events.py index a77f8628..dd94f4fd 100644 --- a/kopf/events.py +++ b/kopf/events.py @@ -11,36 +11,62 @@ TODO """ +import asyncio import sys +from kopf import config from kopf.k8s import events # TODO: rename it it kopf.log()? kopf.events.log()? kopf.events.warn()? -def event(obj, *, type, reason, message=''): +async def event_async(obj, *, type, reason, message=''): """ Issue an event for the object. """ if isinstance(obj, (list, tuple)): for item in obj: - events.post_event(obj=item, type=type, reason=reason, message=message) + await events.post_event(obj=item, type=type, reason=reason, message=message) else: - events.post_event(obj=obj, type=type, reason=reason, message=message) + await events.post_event(obj=obj, type=type, reason=reason, message=message) # Shortcuts for the only two officially documented event types as of now. # However, any arbitrary strings can be used as an event type to the base function. -def info(obj, *, reason, message=''): - return event(obj, reason=reason, message=message, type='Normal') +async def info_async(obj, *, reason, message=''): + if config.EventsConfig.events_loglevel > config.LOGLEVEL_INFO: + return + await event_async(obj, reason=reason, message=message, type='Normal') -def warn(obj, *, reason, message=''): - return event(obj, reason=reason, message=message, type='Warning') +async def warn_async(obj, *, reason, message=''): + if config.EventsConfig.events_loglevel > config.LOGLEVEL_WARNING: + return + await event_async(obj, reason=reason, message=message, type='Warning') -def exception(obj, *, reason='', message='', exc=None): +async def exception_async(obj, *, reason='', message='', exc=None): + if config.EventsConfig.events_loglevel > config.LOGLEVEL_ERROR: + return + if exc is None: _, exc, _ = sys.exc_info() reason = reason if reason else type(exc).__name__ message = f'{message} {exc}' if message else f'{exc}' - return event(obj, reason=reason, message=message, type='Error') + await event_async(obj, reason=reason, message=message, type='Error') + + +# Next 4 funcs are just synchronous interface for async event functions. +def event(obj, *, type, reason, message=''): + asyncio.wait_for(event_async(obj, type=type, reason=reason, message=message), timeout=None) + + +def info(obj, *, reason, message=''): + asyncio.wait_for(info_async(obj, reason=reason, message=message), timeout=None) + + +def warn(obj, *, reason, message=''): + asyncio.wait_for(warn_async(obj, reason=reason, message=message), timeout=None) + + +def exception(obj, *, reason='', message='', exc=None): + asyncio.wait_for(exception_async(obj, reason=reason, message=message, exc=exc), timeout=None) diff --git a/kopf/k8s/events.py b/kopf/k8s/events.py index fc5a79af..9cc47b18 100644 --- a/kopf/k8s/events.py +++ b/kopf/k8s/events.py @@ -1,15 +1,19 @@ +import asyncio import datetime +import functools import logging import kubernetes.client.rest +from kopf import config + logger = logging.getLogger(__name__) MAX_MESSAGE_LENGTH = 1024 CUT_MESSAGE_INFIX = '...' -def post_event(*, obj, type, reason, message=''): +async def post_event(*, obj, type, reason, message=''): """ Issue an event for the object. """ @@ -56,11 +60,13 @@ def post_event(*, obj, type, reason, message=''): event_time=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' ) + api = kubernetes.client.CoreV1Api() + loop = asyncio.get_running_loop() + try: - api = kubernetes.client.CoreV1Api() - api.create_namespaced_event( - namespace=namespace, - body=body, + await loop.run_in_executor( + config.WorkersConfig.get_syn_executor(), + functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body}) ) except kubernetes.client.rest.ApiException as e: # Events are helpful but auxiliary, they should not fail the handling cycle. diff --git a/kopf/k8s/patching.py b/kopf/k8s/patching.py index b43c8036..2432af3a 100644 --- a/kopf/k8s/patching.py +++ b/kopf/k8s/patching.py @@ -1,7 +1,12 @@ +import asyncio +import functools + import kubernetes +from kopf import config + -def patch_obj(*, resource, patch, namespace=None, name=None, body=None): +async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): """ Patch a resource of specific kind. @@ -20,20 +25,17 @@ def patch_obj(*, resource, patch, namespace=None, name=None, body=None): name = body.get('metadata', {}).get('name') if body is not None else name api = kubernetes.client.CustomObjectsApi() - if namespace is None: - api.patch_cluster_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - name=name, - body=patch, - ) - else: - api.patch_namespaced_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace=namespace, - name=name, - body=patch, - ) + request_kwargs = { + 'group': resource.group, + 'version': resource.version, + 'plural': resource.plural, + 'name': name, + 'body': patch + } + patch_func = api.patch_cluster_custom_object + if namespace is not None: + request_kwargs['namespace'] = namespace + patch_func = api.patch_namespaced_custom_object + loop = asyncio.get_running_loop() + + await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), functools.partial(patch_func, **request_kwargs)) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index f1f445a8..db5efe24 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -123,7 +123,7 @@ async def custom_object_handler( # But only once, to reduce the number of API calls and the generated irrelevant events. if patch: logger.debug("Patching with: %r", patch) - patching.patch_obj(resource=resource, patch=patch, body=body) + await patching.patch_obj(resource=resource, patch=patch, body=body) # Sleep strictly after patching, never before -- to keep the status proper. if delay: @@ -205,7 +205,7 @@ async def handle_cause( done = False else: logger.info(f"All handlers succeeded for {title}.") - events.info(cause.body, reason='Success', message=f"All handlers succeeded for {title}.") + await events.info_async(cause.body, reason='Success', message=f"All handlers succeeded for {title}.") done = True else: skip = True @@ -383,14 +383,14 @@ async def _execute( # Definitely retriable error, no matter what is the error-reaction mode. except HandlerRetryError as e: logger.exception(f"Handler {handler.id!r} failed with a retry exception. Will retry.") - events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") + await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay) handlers_left.append(handler) # Definitely fatal error, no matter what is the error-reaction mode. except HandlerFatalError as e: logger.exception(f"Handler {handler.id!r} failed with a fatal exception. Will stop.") - events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") + await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? @@ -398,19 +398,19 @@ async def _execute( except Exception as e: if retry_on_errors: logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.") - events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") + await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY) handlers_left.append(handler) else: logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.") - events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") + await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? # No errors means the handler should be excluded from future runs in this reaction cycle. else: logger.info(f"Handler {handler.id!r} succeeded.") - events.info(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.") + await events.info_async(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.") status.store_success(body=cause.body, patch=cause.patch, handler=handler, result=result) # Provoke the retry of the handling cycle if there were any unfinished handlers, diff --git a/kopf/reactor/invocation.py b/kopf/reactor/invocation.py index 94f46da8..642039ab 100644 --- a/kopf/reactor/invocation.py +++ b/kopf/reactor/invocation.py @@ -6,15 +6,11 @@ All of this goes via the same invocation logic and protocol. """ import asyncio -import concurrent.futures import contextvars import functools from typing import Callable -# The executor for the sync-handlers (i.e. regular functions). -# TODO: make the limits if sync-handlers configurable? -executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) -# executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) +from kopf import config async def invoke( @@ -80,9 +76,7 @@ async def invoke( real_fn = functools.partial(context.run, real_fn) loop = asyncio.get_event_loop() - task = loop.run_in_executor(executor, real_fn) - await asyncio.wait([task]) - result = task.result() # re-raises + result = await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), real_fn) return result diff --git a/kopf/reactor/peering.py b/kopf/reactor/peering.py index 0a1513b2..6017f9f8 100644 --- a/kopf/reactor/peering.py +++ b/kopf/reactor/peering.py @@ -131,19 +131,19 @@ def touch(self, *, lifetime: Optional[int] = None): self.deadline = self.lastseen + self.lifetime self.is_dead = self.deadline <= datetime.datetime.utcnow() - def keepalive(self): + async def keepalive(self): """ Add a peer to the peers, and update its alive status. """ self.touch() - apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) + await apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) - def disappear(self): + async def disappear(self): """ Remove a peer from the peers (gracefully). """ self.touch(lifetime=0) - apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) + await apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) @staticmethod def _is_peering_exist(name: str, namespace: Optional[str]): @@ -172,7 +172,7 @@ def _is_peering_legacy(name: str, namespace: Optional[str]): return obj is not None -def apply_peers( +async def apply_peers( peers: Iterable[Peer], name: str, namespace: Union[None, str], @@ -188,7 +188,7 @@ def apply_peers( resource = (LEGACY_PEERING_RESOURCE if legacy else CLUSTER_PEERING_RESOURCE if namespace is None else NAMESPACED_PEERING_RESOURCE) - patching.patch_obj(resource=resource, namespace=namespace, name=name, patch=patch) + await patching.patch_obj(resource=resource, namespace=namespace, name=name, patch=patch) async def peers_handler( @@ -225,7 +225,7 @@ async def peers_handler( if autoclean and dead_peers: # NB: sync and blocking, but this is fine. - apply_peers(dead_peers, name=ourselves.name, namespace=ourselves.namespace, legacy=ourselves.legacy) + await apply_peers(dead_peers, name=ourselves.name, namespace=ourselves.namespace, legacy=ourselves.legacy) if prio_peers: if not freeze.is_set(): @@ -249,14 +249,14 @@ async def peers_keepalive( try: while True: logger.debug(f"Peering keep-alive update for {ourselves.id} (priority {ourselves.priority})") - ourselves.keepalive() + await ourselves.keepalive() # How often do we update. Keep limited to avoid k8s api flooding. # Should be slightly less than the lifetime, enough for a patch request to finish. await asyncio.sleep(max(1, int(ourselves.lifetime.total_seconds() - 10))) finally: try: - ourselves.disappear() + await ourselves.disappear() except: pass diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index 129067f5..045affc5 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -31,6 +31,7 @@ import aiojobs +from kopf import config from kopf.k8s import watching from kopf.reactor import handling from kopf.reactor import lifecycles @@ -46,15 +47,6 @@ EOS = object() """ An end-of-stream marker sent from the watcher to the workers. """ -WORKER_IDLE_TIMEOUT = 5.0 -""" How long does a worker can idle before exiting and garbage-collecting.""" - -WORKER_BATCH_WINDOW = 0.1 -""" How fast/slow does a worker deplete the queue when an event is received.""" - -WORKER_EXIT_TIMEOUT = 2.0 -""" How long does a worker can work on watcher exit before being cancelled. """ - # TODO: add the label_selector support for the dev-mode? async def watcher( @@ -77,7 +69,7 @@ async def watcher( # All per-object workers are handled as fire-and-forget jobs via the scheduler, # and communicated via the per-object event queues. - scheduler = await aiojobs.create_scheduler(limit=10) + scheduler = await aiojobs.create_scheduler(limit=config.WorkersConfig.queue_workers_limit) queues = {} try: # Either use the existing object's queue, or create a new one together with the per-object job. @@ -128,14 +120,16 @@ async def worker( # If the queue is filled, use the latest event only (within the short timeframe). # If an EOS marker is received, handle the last real event, then finish the worker ASAP. try: - event = await asyncio.wait_for(queue.get(), timeout=WORKER_IDLE_TIMEOUT) + event = await asyncio.wait_for(queue.get(), timeout=config.WorkersConfig.worker_idle_timeout) except asyncio.TimeoutError: break else: try: while True: prev_event = event - next_event = await asyncio.wait_for(queue.get(), timeout=WORKER_BATCH_WINDOW) + next_event = await asyncio.wait_for( + queue.get(), timeout=config.WorkersConfig.worker_batch_window + ) shouldstop = shouldstop or next_event is EOS event = prev_event if next_event is EOS else next_event except asyncio.TimeoutError: @@ -247,8 +241,12 @@ def run( # The errors in the cancellation stage will be ignored anyway (never re-raised below). for task in pending: task.cancel() - cancelled, pending = loop.run_until_complete(asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)) - assert not pending # must be empty by now, the tasks are either done or cancelled. + if pending: + cancelled, pending = loop.run_until_complete(asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)) + assert not pending # must be empty by now, the tasks are either done or cancelled. + else: + # when pending list is empty, let's say cancelled is empty too + cancelled = [] # Check the results of the non-cancelled tasks, and re-raise of there were any exceptions. # The cancelled tasks are not re-raised, as it is a normal flow for the "first-completed" run. @@ -269,8 +267,10 @@ async def _wait_for_depletion(*, scheduler, queues): # Wait for the queues to be depleted, but only if there are some workers running. # Continue with the tasks termination if the timeout is reached, no matter the queues. started = time.perf_counter() - while queues and scheduler.active_count and time.perf_counter() - started < WORKER_EXIT_TIMEOUT: - await asyncio.sleep(WORKER_EXIT_TIMEOUT / 100.) + while queues and \ + scheduler.active_count and \ + time.perf_counter() - started < config.WorkersConfig.worker_exit_timeout: + await asyncio.sleep(config.WorkersConfig.worker_exit_timeout / 100.) # The last check if the termination is going to be graceful or not. if queues: diff --git a/tests/k8s/test_events.py b/tests/k8s/test_events.py index 6c2f63ed..fe1b2d0f 100644 --- a/tests/k8s/test_events.py +++ b/tests/k8s/test_events.py @@ -4,7 +4,7 @@ from kopf.k8s.events import post_event -def test_posting(client_mock): +async def test_posting(client_mock): result = object() apicls_mock = client_mock.CoreV1Api apicls_mock.return_value.create_namespaced_event.return_value = result @@ -15,7 +15,7 @@ def test_posting(client_mock): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} - post_event(obj=obj, type='type', reason='reason', message='message') + await post_event(obj=obj, type='type', reason='reason', message='message') assert postfn_mock.called assert postfn_mock.call_count == 1 @@ -36,7 +36,7 @@ def test_posting(client_mock): assert event.involved_object['uid'] == 'uid' -def test_type_is_v1_not_v1beta1(client_mock): +async def test_type_is_v1_not_v1beta1(client_mock): apicls_mock = client_mock.CoreV1Api postfn_mock = apicls_mock.return_value.create_namespaced_event @@ -45,14 +45,14 @@ def test_type_is_v1_not_v1beta1(client_mock): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} - post_event(obj=obj, type='type', reason='reason', message='message') + await post_event(obj=obj, type='type', reason='reason', message='message') event = postfn_mock.call_args_list[0][1]['body'] assert isinstance(event, client_mock.V1Event) assert not isinstance(event, client_mock.V1beta1Event) -def test_api_errors_logged_but_suppressed(client_mock, assert_logs): +async def test_api_errors_logged_but_suppressed(client_mock, assert_logs): error = client_mock.rest.ApiException('boo!') apicls_mock = client_mock.CoreV1Api apicls_mock.return_value.create_namespaced_event.side_effect = error @@ -63,7 +63,7 @@ def test_api_errors_logged_but_suppressed(client_mock, assert_logs): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} - post_event(obj=obj, type='type', reason='reason', message='message') + await post_event(obj=obj, type='type', reason='reason', message='message') assert postfn_mock.called assert_logs([ @@ -71,7 +71,7 @@ def test_api_errors_logged_but_suppressed(client_mock, assert_logs): ]) -def test_regular_errors_escalate(client_mock): +async def test_regular_errors_escalate(client_mock): error = Exception('boo!') apicls_mock = client_mock.CoreV1Api apicls_mock.return_value.create_namespaced_event.side_effect = error @@ -83,12 +83,12 @@ def test_regular_errors_escalate(client_mock): 'uid': 'uid'}} with pytest.raises(Exception) as excinfo: - post_event(obj=obj, type='type', reason='reason', message='message') + await post_event(obj=obj, type='type', reason='reason', message='message') assert excinfo.value is error -def test_message_is_cut_to_max_length(client_mock): +async def test_message_is_cut_to_max_length(client_mock): result = object() apicls_mock = client_mock.CoreV1Api apicls_mock.return_value.create_namespaced_event.return_value = result @@ -100,7 +100,7 @@ def test_message_is_cut_to_max_length(client_mock): 'name': 'name', 'uid': 'uid'}} message = 'start' + ('x' * 2048) + 'end' - post_event(obj=obj, type='type', reason='reason', message=message) + await post_event(obj=obj, type='type', reason='reason', message=message) event = postfn_mock.call_args_list[0][1]['body'] assert len(event.message) <= 1024 # max supported API message length diff --git a/tests/k8s/test_patching.py b/tests/k8s/test_patching.py index 8d255706..f3598843 100644 --- a/tests/k8s/test_patching.py +++ b/tests/k8s/test_patching.py @@ -1,16 +1,18 @@ +import asyncio + import pytest from asynctest import call from kopf.k8s.patching import patch_obj -def test_by_name_clustered(client_mock, resource): +async def test_by_name_clustered(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object - res = patch_obj(resource=resource, namespace=None, name='name1', patch=patch) + res = await patch_obj(resource=resource, namespace=None, name='name1', patch=patch) assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -24,13 +26,13 @@ def test_by_name_clustered(client_mock, resource): )] -def test_by_name_namespaced(client_mock, resource): +async def test_by_name_namespaced(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_cluster_custom_object mainfn_mock = apicls_mock.return_value.patch_namespaced_custom_object - res = patch_obj(resource=resource, namespace='ns1', name='name1', patch=patch) + res = await patch_obj(resource=resource, namespace='ns1', name='name1', patch=patch) assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -45,14 +47,14 @@ def test_by_name_namespaced(client_mock, resource): )] -def test_by_body_clustered(client_mock, resource): +async def test_by_body_clustered(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object body = {'metadata': {'name': 'name1'}} - res = patch_obj(resource=resource, body=body, patch=patch) + res = await patch_obj(resource=resource, body=body, patch=patch) assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -66,14 +68,14 @@ def test_by_body_clustered(client_mock, resource): )] -def test_by_body_namespaced(client_mock, resource): +async def test_by_body_namespaced(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_cluster_custom_object mainfn_mock = apicls_mock.return_value.patch_namespaced_custom_object body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} - res = patch_obj(resource=resource, body=body, patch=patch) + res = await patch_obj(resource=resource, body=body, patch=patch) assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -88,7 +90,7 @@ def test_by_body_namespaced(client_mock, resource): )] -def test_raises_when_body_conflicts_with_namespace(client_mock, resource): +async def test_raises_when_body_conflicts_with_namespace(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object @@ -96,13 +98,13 @@ def test_raises_when_body_conflicts_with_namespace(client_mock, resource): body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): - patch_obj(resource=resource, body=body, namespace='ns1', patch=patch) + await patch_obj(resource=resource, body=body, namespace='ns1', patch=patch) assert not sidefn_mock.called assert not mainfn_mock.called -def test_raises_when_body_conflicts_with_name(client_mock, resource): +async def test_raises_when_body_conflicts_with_name(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object @@ -110,13 +112,13 @@ def test_raises_when_body_conflicts_with_name(client_mock, resource): body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): - patch_obj(resource=resource, body=body, name='name1', patch=patch) + await patch_obj(resource=resource, body=body, name='name1', patch=patch) assert not sidefn_mock.called assert not mainfn_mock.called -def test_raises_when_body_conflicts_with_ids(client_mock, resource): +async def test_raises_when_body_conflicts_with_ids(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object @@ -124,7 +126,7 @@ def test_raises_when_body_conflicts_with_ids(client_mock, resource): body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): - patch_obj(resource=resource, body=body, namespace='ns1', name='name1', patch=patch) + await patch_obj(resource=resource, body=body, namespace='ns1', name='name1', patch=patch) assert not sidefn_mock.called assert not mainfn_mock.called diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index 193433d9..e8641e32 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -115,9 +115,9 @@ async def test_event_batching(mocker, resource, handler, timer, stream, events, """ Verify that only the last event per uid is actually handled. """ # Override the default timeouts to make the tests faster. - mocker.patch('kopf.reactor.queueing.WORKER_IDLE_TIMEOUT', 0.5) - mocker.patch('kopf.reactor.queueing.WORKER_BATCH_WINDOW', 0.1) - mocker.patch('kopf.reactor.queueing.WORKER_EXIT_TIMEOUT', 0.5) + mocker.patch('kopf.config.WorkersConfig.worker_idle_timeout', 0.5) + mocker.patch('kopf.config.WorkersConfig.worker_batch_window', 0.1) + mocker.patch('kopf.config.WorkersConfig.worker_exit_timeout', 0.5) # Inject the events of unique objects - to produce few queues/workers. stream.return_value = iter(events) @@ -132,8 +132,8 @@ async def test_event_batching(mocker, resource, handler, timer, stream, events, # Significantly less than the queue getting timeout, but sufficient to run. # 2 <= 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough. - from kopf.reactor.queueing import WORKER_BATCH_WINDOW - assert timer.seconds < WORKER_BATCH_WINDOW + CODE_OVERHEAD + from kopf import config + assert timer.seconds < config.WorkersConfig.worker_batch_window + CODE_OVERHEAD # Was the handler called at all? Awaited as needed for async fns? assert handler.awaited @@ -165,9 +165,9 @@ async def test_event_batching(mocker, resource, handler, timer, stream, events, async def test_garbage_collection_of_queues(mocker, stream, events, unique, worker_spy): # Override the default timeouts to make the tests faster. - mocker.patch('kopf.reactor.queueing.WORKER_IDLE_TIMEOUT', 0.5) - mocker.patch('kopf.reactor.queueing.WORKER_BATCH_WINDOW', 0.1) - mocker.patch('kopf.reactor.queueing.WORKER_EXIT_TIMEOUT', 0.5) + mocker.patch('kopf.config.WorkersConfig.worker_idle_timeout', 0.5) + mocker.patch('kopf.config.WorkersConfig.worker_batch_window', 0.1) + mocker.patch('kopf.config.WorkersConfig.worker_exit_timeout', 0.5) # Inject the events of unique objects - to produce few queues/workers. stream.return_value = iter(events) @@ -187,9 +187,9 @@ async def test_garbage_collection_of_queues(mocker, stream, events, unique, work # Give the workers some time to finish waiting for the events. # Once the idle timeout, they will exit and gc their individual queues. - from kopf.reactor.queueing import WORKER_IDLE_TIMEOUT, WORKER_BATCH_WINDOW - await asyncio.sleep(WORKER_BATCH_WINDOW) # depleting the queues. - await asyncio.sleep(WORKER_IDLE_TIMEOUT) # idling on empty queues. + from kopf import config + await asyncio.sleep(config.WorkersConfig.worker_batch_window) # depleting the queues. + await asyncio.sleep(config.WorkersConfig.worker_idle_timeout) # idling on empty queues. await asyncio.sleep(CODE_OVERHEAD) # The mutable(!) queues dict is now empty, i.e. garbage-collected.