From 2bd482822de9b2657353f27ab55c307691dae63b Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Wed, 12 Jun 2019 17:33:10 +0300 Subject: [PATCH 01/17] 1. Added config - kopf.config.WorkersConfig - class attributes based options to control async workers. So user can just import it before all other kopf modules and change values to required. Maybe not the best decision, but any. 2. So now can be configured number of next types of workers: queue processing, object patching, event creation. 3. Blocking synchronous operations such as event creation, changed objects patching, are now asynchronous (i guess). Existing tests updated for new async functions. E2E tests required to test performance(how fast large number of new/changed objects can be processed). Such tests would prove efficiency of kopf concurrency. --- examples/04-events/example.py | 18 ++++++++---- examples/99-all-at-once/example.py | 10 ++++--- kopf/cli.py | 7 +++-- kopf/config.py | 25 +++++++++++++++++ kopf/events.py | 18 ++++++------ kopf/k8s/events.py | 22 +++++++++++---- kopf/k8s/patching.py | 44 ++++++++++++++++++------------ kopf/reactor/handling.py | 14 +++++----- kopf/reactor/invocation.py | 4 ++- kopf/reactor/peering.py | 18 ++++++------ kopf/reactor/queueing.py | 20 ++++---------- tests/k8s/test_patching.py | 25 +++++++++++++---- tests/reactor/test_queueing.py | 22 +++++++-------- 13 files changed, 156 insertions(+), 91 deletions(-) diff --git a/examples/04-events/example.py b/examples/04-events/example.py index f36c33e2..b3a8c52f 100644 --- a/examples/04-events/example.py +++ b/examples/04-events/example.py @@ -1,19 +1,25 @@ """ Send the custom events for the handled or other objects. """ +import asyncio + import kopf @kopf.on.create('zalando.org', 'v1', 'kopfexamples') def create_fn(body, **kwargs): - # The all-purpose function for the vent creation. - kopf.event(body, type="SomeType", reason="SomeReason", message="Some message") + asyncio.wait([ + # The all-purpose function for the vent creation. + kopf.event(body, type="SomeType", reason="SomeReason", message="Some message"), + + # The shortcuts for the conventional events and common cases. + kopf.info(body, reason="SomeReason", message="Some message"), + kopf.warn(body, reason="SomeReason", message="Some message"), + ]) - # The shortcuts for the conventional events and common cases. - kopf.info(body, reason="SomeReason", message="Some message") - kopf.warn(body, reason="SomeReason", message="Some message") try: raise RuntimeError("Exception text.") except: - kopf.exception(body, reason="SomeReason", message="Some exception:") + loop = asyncio.get_event_loop() + loop.run_until_complete(kopf.exception(body, reason="SomeReason", message="Some exception:")) diff --git a/examples/99-all-at-once/example.py b/examples/99-all-at-once/example.py index 531f703c..5f9991b6 100644 --- a/examples/99-all-at-once/example.py +++ b/examples/99-all-at-once/example.py @@ -1,7 +1,7 @@ """ Kubernetes operator example: all the features at once (for debugging & testing). """ - +import asyncio import pprint import time @@ -17,9 +17,11 @@ def create_1(body, meta, spec, status, **kwargs): children = _create_children(owner=body) - kopf.info(body, reason='AnyReason') - kopf.event(body, type='Warning', reason='SomeReason', message="Cannot do something") - kopf.event(children, type='Normal', reason='SomeReason', message="Created as part of the job1step") + asyncio.wait([ + kopf.info(body, reason='AnyReason'), + kopf.event(body, type='Warning', reason='SomeReason', message="Cannot do something"), + kopf.event(children, type='Normal', reason='SomeReason', message="Created as part of the job1step"), + ]) return {'job1-status': 100} diff --git a/kopf/cli.py b/kopf/cli.py index 3e9b996b..2e16b377 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -1,3 +1,4 @@ +import asyncio import functools import click @@ -70,7 +71,8 @@ def freeze(id, message, lifetime, namespace, peering_name, priority): priority=priority, lifetime=lifetime, ) - ourserlves.keepalive() + loop = asyncio.get_event_loop() + loop.run_until_complete(ourserlves.keepalive()) @main.command() @@ -86,4 +88,5 @@ def resume(id, namespace, peering_name): name=peering_name, namespace=namespace, ) - ourselves.disappear() + loop = asyncio.get_event_loop() + loop.run_until_complete(ourselves.disappear()) diff --git a/kopf/config.py b/kopf/config.py index 451817fc..780fd34a 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -77,3 +77,28 @@ def configure(debug=None, verbose=None, quiet=None): loop = asyncio.get_event_loop() loop.set_debug(debug) + + +class WorkersConfig: + """Used as single point of configuration for kopf.reactor""" + + """How many workers can be running simultaneously on event creation operations""" + synchronous_event_post_workers_limit = None + + """How many workers can be running simultaneously on patch operations""" + synchronous_patch_workers_limit = None + + """How many workers can be running simultaneously on per-object event queue""" + queue_workers_limit = None # if None, there is no limits to workers number + + """How many threads in total can be running simultaneously to handle non-async handler functions""" + synchronous_handlers_threadpool_limit = None # if None, calculated by ThreadPoolExecutor based on cpu count + + """How long does a worker can idle before exiting and garbage-collecting.""" + worker_idle_timeout = 5.0 + + """How fast/slow does a worker deplete the queue when an event is received.""" + worker_batch_window = 0.1 + + """How long does a worker can work on watcher exit before being cancelled. """ + worker_exit_timeout = 2.0 diff --git a/kopf/events.py b/kopf/events.py index a77f8628..29f54743 100644 --- a/kopf/events.py +++ b/kopf/events.py @@ -17,30 +17,30 @@ # TODO: rename it it kopf.log()? kopf.events.log()? kopf.events.warn()? -def event(obj, *, type, reason, message=''): +async def event(obj, *, type, reason, message=''): """ Issue an event for the object. """ if isinstance(obj, (list, tuple)): for item in obj: - events.post_event(obj=item, type=type, reason=reason, message=message) + await events.post_event(obj=item, type=type, reason=reason, message=message) else: - events.post_event(obj=obj, type=type, reason=reason, message=message) + await events.post_event(obj=obj, type=type, reason=reason, message=message) # Shortcuts for the only two officially documented event types as of now. # However, any arbitrary strings can be used as an event type to the base function. -def info(obj, *, reason, message=''): - return event(obj, reason=reason, message=message, type='Normal') +async def info(obj, *, reason, message=''): + return await event(obj, reason=reason, message=message, type='Normal') -def warn(obj, *, reason, message=''): - return event(obj, reason=reason, message=message, type='Warning') +async def warn(obj, *, reason, message=''): + return await event(obj, reason=reason, message=message, type='Warning') -def exception(obj, *, reason='', message='', exc=None): +async def exception(obj, *, reason='', message='', exc=None): if exc is None: _, exc, _ = sys.exc_info() reason = reason if reason else type(exc).__name__ message = f'{message} {exc}' if message else f'{exc}' - return event(obj, reason=reason, message=message, type='Error') + return await event(obj, reason=reason, message=message, type='Error') diff --git a/kopf/k8s/events.py b/kopf/k8s/events.py index fc5a79af..e9f0a782 100644 --- a/kopf/k8s/events.py +++ b/kopf/k8s/events.py @@ -1,15 +1,23 @@ +import asyncio +import concurrent.futures import datetime +import functools import logging import kubernetes.client.rest +from kopf.config import WorkersConfig + logger = logging.getLogger(__name__) MAX_MESSAGE_LENGTH = 1024 CUT_MESSAGE_INFIX = '...' -def post_event(*, obj, type, reason, message=''): +event_executor = concurrent.futures.ThreadPoolExecutor(max_workers=WorkersConfig.synchronous_event_post_workers_limit) + + +async def post_event(*, obj, type, reason, message=''): """ Issue an event for the object. """ @@ -56,11 +64,15 @@ def post_event(*, obj, type, reason, message=''): event_time=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' ) + api = kubernetes.client.CoreV1Api() + loop = asyncio.get_event_loop() + if not loop: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: - api = kubernetes.client.CoreV1Api() - api.create_namespaced_event( - namespace=namespace, - body=body, + await loop.run_in_executor( + event_executor, functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body}) ) except kubernetes.client.rest.ApiException as e: # Events are helpful but auxiliary, they should not fail the handling cycle. diff --git a/kopf/k8s/patching.py b/kopf/k8s/patching.py index b43c8036..f8e36a4a 100644 --- a/kopf/k8s/patching.py +++ b/kopf/k8s/patching.py @@ -1,7 +1,15 @@ +import asyncio +import concurrent.futures +import functools + import kubernetes +from kopf.config import WorkersConfig + +patch_executor = concurrent.futures.ThreadPoolExecutor(max_workers=WorkersConfig.synchronous_patch_workers_limit) -def patch_obj(*, resource, patch, namespace=None, name=None, body=None): + +async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): """ Patch a resource of specific kind. @@ -20,20 +28,20 @@ def patch_obj(*, resource, patch, namespace=None, name=None, body=None): name = body.get('metadata', {}).get('name') if body is not None else name api = kubernetes.client.CustomObjectsApi() - if namespace is None: - api.patch_cluster_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - name=name, - body=patch, - ) - else: - api.patch_namespaced_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace=namespace, - name=name, - body=patch, - ) + request_kwargs = { + 'group': resource.group, + 'version': resource.version, + 'plural': resource.plural, + 'name': name, + 'body': patch + } + patch_func = api.patch_cluster_custom_object + if namespace is not None: + request_kwargs['namespace'] = namespace + patch_func = api.patch_namespaced_custom_object + loop = asyncio.get_event_loop() + if not loop: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + await loop.run_in_executor(patch_executor, functools.partial(patch_func, **request_kwargs)) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 42e655a5..19f5627d 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -122,7 +122,7 @@ async def custom_object_handler( # But only once, to reduce the number of API calls and the generated irrelevant events. if patch: logger.debug("Patching with: %r", patch) - patching.patch_obj(resource=resource, patch=patch, body=body) + await patching.patch_obj(resource=resource, patch=patch, body=body) # Sleep strictly after patching, never before -- to keep the status proper. if delay: @@ -193,7 +193,7 @@ async def handle_cause( done = False else: logger.info(f"All handlers succeeded for {title}.") - events.info(cause.body, reason='Success', message=f"All handlers succeeded for {title}.") + await events.info(cause.body, reason='Success', message=f"All handlers succeeded for {title}.") done = True # Regular causes also do some implicit post-handling when all handlers are done. @@ -369,14 +369,14 @@ async def _execute( # Definitely retriable error, no matter what is the error-reaction mode. except HandlerRetryError as e: logger.exception(f"Handler {handler.id!r} failed with a retry exception. Will retry.") - events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") + await events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay) handlers_left.append(handler) # Definitely fatal error, no matter what is the error-reaction mode. except HandlerFatalError as e: logger.exception(f"Handler {handler.id!r} failed with a fatal exception. Will stop.") - events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") + await events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? @@ -384,19 +384,19 @@ async def _execute( except Exception as e: if retry_on_errors: logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.") - events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") + await events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY) handlers_left.append(handler) else: logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.") - events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") + await events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? # No errors means the handler should be excluded from future runs in this reaction cycle. else: logger.info(f"Handler {handler.id!r} succeeded.") - events.info(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.") + await events.info(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.") status.store_success(body=cause.body, patch=cause.patch, handler=handler, result=result) # Provoke the retry of the handling cycle if there were any unfinished handlers, diff --git a/kopf/reactor/invocation.py b/kopf/reactor/invocation.py index 94f46da8..d6a519bf 100644 --- a/kopf/reactor/invocation.py +++ b/kopf/reactor/invocation.py @@ -13,7 +13,9 @@ # The executor for the sync-handlers (i.e. regular functions). # TODO: make the limits if sync-handlers configurable? -executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) +from kopf.config import WorkersConfig + +executor = concurrent.futures.ThreadPoolExecutor(max_workers=WorkersConfig.synchronous_handlers_threadpool_limit) # executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) diff --git a/kopf/reactor/peering.py b/kopf/reactor/peering.py index 0a1513b2..6017f9f8 100644 --- a/kopf/reactor/peering.py +++ b/kopf/reactor/peering.py @@ -131,19 +131,19 @@ def touch(self, *, lifetime: Optional[int] = None): self.deadline = self.lastseen + self.lifetime self.is_dead = self.deadline <= datetime.datetime.utcnow() - def keepalive(self): + async def keepalive(self): """ Add a peer to the peers, and update its alive status. """ self.touch() - apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) + await apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) - def disappear(self): + async def disappear(self): """ Remove a peer from the peers (gracefully). """ self.touch(lifetime=0) - apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) + await apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) @staticmethod def _is_peering_exist(name: str, namespace: Optional[str]): @@ -172,7 +172,7 @@ def _is_peering_legacy(name: str, namespace: Optional[str]): return obj is not None -def apply_peers( +async def apply_peers( peers: Iterable[Peer], name: str, namespace: Union[None, str], @@ -188,7 +188,7 @@ def apply_peers( resource = (LEGACY_PEERING_RESOURCE if legacy else CLUSTER_PEERING_RESOURCE if namespace is None else NAMESPACED_PEERING_RESOURCE) - patching.patch_obj(resource=resource, namespace=namespace, name=name, patch=patch) + await patching.patch_obj(resource=resource, namespace=namespace, name=name, patch=patch) async def peers_handler( @@ -225,7 +225,7 @@ async def peers_handler( if autoclean and dead_peers: # NB: sync and blocking, but this is fine. - apply_peers(dead_peers, name=ourselves.name, namespace=ourselves.namespace, legacy=ourselves.legacy) + await apply_peers(dead_peers, name=ourselves.name, namespace=ourselves.namespace, legacy=ourselves.legacy) if prio_peers: if not freeze.is_set(): @@ -249,14 +249,14 @@ async def peers_keepalive( try: while True: logger.debug(f"Peering keep-alive update for {ourselves.id} (priority {ourselves.priority})") - ourselves.keepalive() + await ourselves.keepalive() # How often do we update. Keep limited to avoid k8s api flooding. # Should be slightly less than the lifetime, enough for a patch request to finish. await asyncio.sleep(max(1, int(ourselves.lifetime.total_seconds() - 10))) finally: try: - ourselves.disappear() + await ourselves.disappear() except: pass diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index 129067f5..b9f54069 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -31,6 +31,7 @@ import aiojobs +from kopf.config import WorkersConfig from kopf.k8s import watching from kopf.reactor import handling from kopf.reactor import lifecycles @@ -46,15 +47,6 @@ EOS = object() """ An end-of-stream marker sent from the watcher to the workers. """ -WORKER_IDLE_TIMEOUT = 5.0 -""" How long does a worker can idle before exiting and garbage-collecting.""" - -WORKER_BATCH_WINDOW = 0.1 -""" How fast/slow does a worker deplete the queue when an event is received.""" - -WORKER_EXIT_TIMEOUT = 2.0 -""" How long does a worker can work on watcher exit before being cancelled. """ - # TODO: add the label_selector support for the dev-mode? async def watcher( @@ -77,7 +69,7 @@ async def watcher( # All per-object workers are handled as fire-and-forget jobs via the scheduler, # and communicated via the per-object event queues. - scheduler = await aiojobs.create_scheduler(limit=10) + scheduler = await aiojobs.create_scheduler(limit=WorkersConfig.queue_workers_limit) queues = {} try: # Either use the existing object's queue, or create a new one together with the per-object job. @@ -128,14 +120,14 @@ async def worker( # If the queue is filled, use the latest event only (within the short timeframe). # If an EOS marker is received, handle the last real event, then finish the worker ASAP. try: - event = await asyncio.wait_for(queue.get(), timeout=WORKER_IDLE_TIMEOUT) + event = await asyncio.wait_for(queue.get(), timeout=WorkersConfig.worker_idle_timeout) except asyncio.TimeoutError: break else: try: while True: prev_event = event - next_event = await asyncio.wait_for(queue.get(), timeout=WORKER_BATCH_WINDOW) + next_event = await asyncio.wait_for(queue.get(), timeout=WorkersConfig.worker_batch_window) shouldstop = shouldstop or next_event is EOS event = prev_event if next_event is EOS else next_event except asyncio.TimeoutError: @@ -269,8 +261,8 @@ async def _wait_for_depletion(*, scheduler, queues): # Wait for the queues to be depleted, but only if there are some workers running. # Continue with the tasks termination if the timeout is reached, no matter the queues. started = time.perf_counter() - while queues and scheduler.active_count and time.perf_counter() - started < WORKER_EXIT_TIMEOUT: - await asyncio.sleep(WORKER_EXIT_TIMEOUT / 100.) + while queues and scheduler.active_count and time.perf_counter() - started < WorkersConfig.worker_exit_timeout: + await asyncio.sleep(WorkersConfig.worker_exit_timeout / 100.) # The last check if the termination is going to be graceful or not. if queues: diff --git a/tests/k8s/test_patching.py b/tests/k8s/test_patching.py index 8d255706..6a7f17b0 100644 --- a/tests/k8s/test_patching.py +++ b/tests/k8s/test_patching.py @@ -1,3 +1,5 @@ +import asyncio + import pytest from asynctest import call @@ -10,7 +12,10 @@ def test_by_name_clustered(client_mock, resource): sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object - res = patch_obj(resource=resource, namespace=None, name='name1', patch=patch) + task = asyncio.create_task(patch_obj(resource=resource, namespace=None, name='name1', patch=patch)) + loop = asyncio.get_event_loop() + loop.run_until_complete(task) + res = task.result assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -30,7 +35,10 @@ def test_by_name_namespaced(client_mock, resource): sidefn_mock = apicls_mock.return_value.patch_cluster_custom_object mainfn_mock = apicls_mock.return_value.patch_namespaced_custom_object - res = patch_obj(resource=resource, namespace='ns1', name='name1', patch=patch) + task = asyncio.create_task(patch_obj(resource=resource, namespace='ns1', name='name1', patch=patch)) + loop = asyncio.get_event_loop() + loop.run_until_complete(task) + res = task.result assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -52,7 +60,10 @@ def test_by_body_clustered(client_mock, resource): mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object body = {'metadata': {'name': 'name1'}} - res = patch_obj(resource=resource, body=body, patch=patch) + task = asyncio.create_task(patch_obj(resource=resource, body=body, patch=patch)) + loop = asyncio.get_event_loop() + loop.run_until_complete(task) + res = task.result assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -73,7 +84,10 @@ def test_by_body_namespaced(client_mock, resource): mainfn_mock = apicls_mock.return_value.patch_namespaced_custom_object body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} - res = patch_obj(resource=resource, body=body, patch=patch) + task = asyncio.create_task(patch_obj(resource=resource, body=body, patch=patch)) + loop = asyncio.get_event_loop() + loop.run_until_complete(task) + res = task.result assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -96,7 +110,8 @@ def test_raises_when_body_conflicts_with_namespace(client_mock, resource): body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): - patch_obj(resource=resource, body=body, namespace='ns1', patch=patch) + loop = asyncio.get_event_loop() + loop.run_until_complete(patch_obj(resource=resource, body=body, namespace='ns1', patch=patch)) assert not sidefn_mock.called assert not mainfn_mock.called diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index bd545e4c..43888c66 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -115,9 +115,9 @@ async def test_event_batching(mocker, resource, handler, timer, stream, events, """ Verify that only the last event per uid is actually handled. """ # Override the default timeouts to make the tests faster. - mocker.patch('kopf.reactor.queueing.WORKER_IDLE_TIMEOUT', 0.5) - mocker.patch('kopf.reactor.queueing.WORKER_BATCH_WINDOW', 0.1) - mocker.patch('kopf.reactor.queueing.WORKER_EXIT_TIMEOUT', 0.5) + mocker.patch('kopf.config.WorkersConfig.worker_idle_timeout', 0.5) + mocker.patch('kopf.config.WorkersConfig.worker_batch_window', 0.1) + mocker.patch('kopf.config.WorkersConfig.worker_exit_timeout', 0.5) # Inject the events of unique objects - to produce few queues/workers. stream.return_value = iter(events) @@ -132,8 +132,8 @@ async def test_event_batching(mocker, resource, handler, timer, stream, events, # Significantly less than the queue getting timeout, but sufficient to run. # 2 <= 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough. - from kopf.reactor.queueing import WORKER_BATCH_WINDOW - assert timer.seconds < WORKER_BATCH_WINDOW + CODE_OVERHEAD + from kopf.config import WorkersConfig + assert timer.seconds < WorkersConfig.worker_batch_window + CODE_OVERHEAD # Was the handler called at all? Awaited as needed for async fns? assert handler.awaited @@ -165,9 +165,9 @@ async def test_event_batching(mocker, resource, handler, timer, stream, events, async def test_garbage_collection_of_queues(mocker, stream, events, unique, worker_spy): # Override the default timeouts to make the tests faster. - mocker.patch('kopf.reactor.queueing.WORKER_IDLE_TIMEOUT', 0.5) - mocker.patch('kopf.reactor.queueing.WORKER_BATCH_WINDOW', 0.1) - mocker.patch('kopf.reactor.queueing.WORKER_EXIT_TIMEOUT', 0.5) + mocker.patch('kopf.config.WorkersConfig.worker_idle_timeout', 0.5) + mocker.patch('kopf.config.WorkersConfig.worker_batch_window', 0.1) + mocker.patch('kopf.config.WorkersConfig.worker_exit_timeout', 0.5) # Inject the events of unique objects - to produce few queues/workers. stream.return_value = iter(events) @@ -187,9 +187,9 @@ async def test_garbage_collection_of_queues(mocker, stream, events, unique, work # Give the workers some time to finish waiting for the events. # Once the idle timeout, they will exit and gc their individual queues. - from kopf.reactor.queueing import WORKER_IDLE_TIMEOUT, WORKER_BATCH_WINDOW - await asyncio.sleep(WORKER_BATCH_WINDOW) # depleting the queues. - await asyncio.sleep(WORKER_IDLE_TIMEOUT) # idling on empty queues. + from kopf.config import WorkersConfig + await asyncio.sleep(WorkersConfig.worker_batch_window) # depleting the queues. + await asyncio.sleep(WorkersConfig.worker_idle_timeout) # idling on empty queues. await asyncio.sleep(CODE_OVERHEAD) # The mutable(!) queues dict is now empty, i.e. garbage-collected. From f62446ff8f93e9d763189a4c7096af9744518a52 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Fri, 14 Jun 2019 12:09:48 +0300 Subject: [PATCH 02/17] Synchronous event interface saved, events loglevel configuration added. --- examples/04-events/example.py | 15 +++++------ examples/99-all-at-once/example.py | 8 +++--- kopf/config.py | 13 ++++++++++ kopf/events.py | 40 ++++++++++++++++++++++++------ kopf/reactor/handling.py | 12 ++++----- 5 files changed, 61 insertions(+), 27 deletions(-) diff --git a/examples/04-events/example.py b/examples/04-events/example.py index b3a8c52f..5cfa32ed 100644 --- a/examples/04-events/example.py +++ b/examples/04-events/example.py @@ -9,17 +9,14 @@ @kopf.on.create('zalando.org', 'v1', 'kopfexamples') def create_fn(body, **kwargs): - asyncio.wait([ - # The all-purpose function for the vent creation. - kopf.event(body, type="SomeType", reason="SomeReason", message="Some message"), + # The all-purpose function for the vent creation. + kopf.event(body, type="SomeType", reason="SomeReason", message="Some message") - # The shortcuts for the conventional events and common cases. - kopf.info(body, reason="SomeReason", message="Some message"), - kopf.warn(body, reason="SomeReason", message="Some message"), - ]) + # The shortcuts for the conventional events and common cases. + kopf.info(body, reason="SomeReason", message="Some message") + kopf.warn(body, reason="SomeReason", message="Some message") try: raise RuntimeError("Exception text.") except: - loop = asyncio.get_event_loop() - loop.run_until_complete(kopf.exception(body, reason="SomeReason", message="Some exception:")) + kopf.exception(body, reason="SomeReason", message="Some exception:") diff --git a/examples/99-all-at-once/example.py b/examples/99-all-at-once/example.py index 5f9991b6..d958294d 100644 --- a/examples/99-all-at-once/example.py +++ b/examples/99-all-at-once/example.py @@ -17,11 +17,9 @@ def create_1(body, meta, spec, status, **kwargs): children = _create_children(owner=body) - asyncio.wait([ - kopf.info(body, reason='AnyReason'), - kopf.event(body, type='Warning', reason='SomeReason', message="Cannot do something"), - kopf.event(children, type='Normal', reason='SomeReason', message="Created as part of the job1step"), - ]) + kopf.info(body, reason='AnyReason') + kopf.event(body, type='Warning', reason='SomeReason', message="Cannot do something") + kopf.event(children, type='Normal', reason='SomeReason', message="Created as part of the job1step") return {'job1-status': 100} diff --git a/kopf/config.py b/kopf/config.py index 780fd34a..7c7a9ebb 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -11,6 +11,12 @@ format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' +LOGLEVEL_INFO = logging.INFO +LOGLEVEL_WARNING = logging.WARNING +LOGLEVEL_ERROR = logging.ERROR +LOGLEVEL_CRITICAL = logging.CRITICAL + + class LoginError(click.ClickException): """ Raised when the operator cannot login to the API. """ @@ -79,6 +85,13 @@ def configure(debug=None, verbose=None, quiet=None): loop.set_debug(debug) +class EventsConfig: + """Used to configure events sending behaviour""" + + """What events should be logged""" + events_loglevel = LOGLEVEL_INFO + + class WorkersConfig: """Used as single point of configuration for kopf.reactor""" diff --git a/kopf/events.py b/kopf/events.py index 29f54743..368ceb64 100644 --- a/kopf/events.py +++ b/kopf/events.py @@ -11,13 +11,15 @@ TODO """ +import asyncio import sys +from kopf import config from kopf.k8s import events # TODO: rename it it kopf.log()? kopf.events.log()? kopf.events.warn()? -async def event(obj, *, type, reason, message=''): +async def event_async(obj, *, type, reason, message=''): """ Issue an event for the object. """ @@ -30,17 +32,41 @@ async def event(obj, *, type, reason, message=''): # Shortcuts for the only two officially documented event types as of now. # However, any arbitrary strings can be used as an event type to the base function. -async def info(obj, *, reason, message=''): - return await event(obj, reason=reason, message=message, type='Normal') +async def info_async(obj, *, reason, message=''): + if config.EventsConfig.events_loglevel > config.LOGLEVEL_INFO: + return + await event_async(obj, reason=reason, message=message, type='Normal') -async def warn(obj, *, reason, message=''): - return await event(obj, reason=reason, message=message, type='Warning') +async def warn_async(obj, *, reason, message=''): + if config.EventsConfig.events_loglevel > config.LOGLEVEL_WARNING: + return + await event_async(obj, reason=reason, message=message, type='Warning') -async def exception(obj, *, reason='', message='', exc=None): +async def exception_async(obj, *, reason='', message='', exc=None): + if config.EventsConfig.events_loglevel > config.LOGLEVEL_ERROR: + return + if exc is None: _, exc, _ = sys.exc_info() reason = reason if reason else type(exc).__name__ message = f'{message} {exc}' if message else f'{exc}' - return await event(obj, reason=reason, message=message, type='Error') + await event_async(obj, reason=reason, message=message, type='Error') + + +# Next 4 funcs are just synchronous interface for async event functions. +def event(obj, *, type, reason, message=''): + asyncio.wait_for(event_async(obj, type=type, reason=reason, message=message), timeout=2) + + +def info(obj, *, reason, message=''): + asyncio.wait_for(info_async(obj, reason=reason, message=message), timeout=2) + + +def warn(obj, *, reason, message=''): + asyncio.wait_for(warn_async(obj, reason=reason, message=message), timeout=2) + + +def exception(obj, *, reason='', message='', exc=None): + asyncio.wait_for(exception_async(obj, reason=reason, message=message, exc=exc), timeout=2) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 48eb1185..1d0afd8b 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -200,7 +200,7 @@ async def handle_cause( done = False else: logger.info(f"All handlers succeeded for {title}.") - events.info(cause.body, reason='Success', message=f"All handlers succeeded for {title}.") + await events.info_async(cause.body, reason='Success', message=f"All handlers succeeded for {title}.") done = True else: skip = True @@ -378,14 +378,14 @@ async def _execute( # Definitely retriable error, no matter what is the error-reaction mode. except HandlerRetryError as e: logger.exception(f"Handler {handler.id!r} failed with a retry exception. Will retry.") - await events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") + await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay) handlers_left.append(handler) # Definitely fatal error, no matter what is the error-reaction mode. except HandlerFatalError as e: logger.exception(f"Handler {handler.id!r} failed with a fatal exception. Will stop.") - await events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") + await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? @@ -393,19 +393,19 @@ async def _execute( except Exception as e: if retry_on_errors: logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.") - await events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") + await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY) handlers_left.append(handler) else: logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.") - await events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") + await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? # No errors means the handler should be excluded from future runs in this reaction cycle. else: logger.info(f"Handler {handler.id!r} succeeded.") - await events.info(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.") + await events.info_async(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.") status.store_success(body=cause.body, patch=cause.patch, handler=handler, result=result) # Provoke the retry of the handling cycle if there were any unfinished handlers, From ae39e11a03174fbc76074b7b8c40b043528ee16a Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Fri, 14 Jun 2019 22:35:14 +0300 Subject: [PATCH 03/17] fix ValueError Set of coroutines/Futures is empty. --- kopf/reactor/queueing.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index b9f54069..dfe2b422 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -239,8 +239,11 @@ def run( # The errors in the cancellation stage will be ignored anyway (never re-raised below). for task in pending: task.cancel() - cancelled, pending = loop.run_until_complete(asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)) - assert not pending # must be empty by now, the tasks are either done or cancelled. + if pending: + cancelled, pending = loop.run_until_complete(asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)) + assert not pending # must be empty by now, the tasks are either done or cancelled. + else: + cancelled = [] # Check the results of the non-cancelled tasks, and re-raise of there were any exceptions. # The cancelled tasks are not re-raised, as it is a normal flow for the "first-completed" run. From 59c82a9932c2d909aa97d04d9319c60e980525cf Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 17 Jun 2019 13:43:10 +0300 Subject: [PATCH 04/17] fix import style on PR comments --- examples/04-events/example.py | 2 -- examples/99-all-at-once/example.py | 1 - kopf/k8s/events.py | 4 ++-- kopf/k8s/patching.py | 4 ++-- kopf/reactor/invocation.py | 4 ++-- kopf/reactor/queueing.py | 16 ++++++++++------ tests/reactor/test_queueing.py | 10 +++++----- 7 files changed, 21 insertions(+), 20 deletions(-) diff --git a/examples/04-events/example.py b/examples/04-events/example.py index 5cfa32ed..f4666ed6 100644 --- a/examples/04-events/example.py +++ b/examples/04-events/example.py @@ -1,8 +1,6 @@ """ Send the custom events for the handled or other objects. """ -import asyncio - import kopf diff --git a/examples/99-all-at-once/example.py b/examples/99-all-at-once/example.py index d958294d..384d49f4 100644 --- a/examples/99-all-at-once/example.py +++ b/examples/99-all-at-once/example.py @@ -1,7 +1,6 @@ """ Kubernetes operator example: all the features at once (for debugging & testing). """ -import asyncio import pprint import time diff --git a/kopf/k8s/events.py b/kopf/k8s/events.py index e9f0a782..18d958f2 100644 --- a/kopf/k8s/events.py +++ b/kopf/k8s/events.py @@ -6,7 +6,7 @@ import kubernetes.client.rest -from kopf.config import WorkersConfig +from kopf import config logger = logging.getLogger(__name__) @@ -14,7 +14,7 @@ CUT_MESSAGE_INFIX = '...' -event_executor = concurrent.futures.ThreadPoolExecutor(max_workers=WorkersConfig.synchronous_event_post_workers_limit) +event_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_event_post_workers_limit) async def post_event(*, obj, type, reason, message=''): diff --git a/kopf/k8s/patching.py b/kopf/k8s/patching.py index f8e36a4a..7f2c30a5 100644 --- a/kopf/k8s/patching.py +++ b/kopf/k8s/patching.py @@ -4,9 +4,9 @@ import kubernetes -from kopf.config import WorkersConfig +from kopf import config -patch_executor = concurrent.futures.ThreadPoolExecutor(max_workers=WorkersConfig.synchronous_patch_workers_limit) +patch_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_patch_workers_limit) async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): diff --git a/kopf/reactor/invocation.py b/kopf/reactor/invocation.py index d6a519bf..d2a30535 100644 --- a/kopf/reactor/invocation.py +++ b/kopf/reactor/invocation.py @@ -13,9 +13,9 @@ # The executor for the sync-handlers (i.e. regular functions). # TODO: make the limits if sync-handlers configurable? -from kopf.config import WorkersConfig +from kopf import config -executor = concurrent.futures.ThreadPoolExecutor(max_workers=WorkersConfig.synchronous_handlers_threadpool_limit) +executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_handlers_threadpool_limit) # executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index dfe2b422..36b8c0f3 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -31,7 +31,7 @@ import aiojobs -from kopf.config import WorkersConfig +from kopf import config from kopf.k8s import watching from kopf.reactor import handling from kopf.reactor import lifecycles @@ -69,7 +69,7 @@ async def watcher( # All per-object workers are handled as fire-and-forget jobs via the scheduler, # and communicated via the per-object event queues. - scheduler = await aiojobs.create_scheduler(limit=WorkersConfig.queue_workers_limit) + scheduler = await aiojobs.create_scheduler(limit=config.WorkersConfig.queue_workers_limit) queues = {} try: # Either use the existing object's queue, or create a new one together with the per-object job. @@ -120,14 +120,16 @@ async def worker( # If the queue is filled, use the latest event only (within the short timeframe). # If an EOS marker is received, handle the last real event, then finish the worker ASAP. try: - event = await asyncio.wait_for(queue.get(), timeout=WorkersConfig.worker_idle_timeout) + event = await asyncio.wait_for(queue.get(), timeout=config.WorkersConfig.worker_idle_timeout) except asyncio.TimeoutError: break else: try: while True: prev_event = event - next_event = await asyncio.wait_for(queue.get(), timeout=WorkersConfig.worker_batch_window) + next_event = await asyncio.wait_for( + queue.get(), timeout=config.WorkersConfig.worker_batch_window + ) shouldstop = shouldstop or next_event is EOS event = prev_event if next_event is EOS else next_event except asyncio.TimeoutError: @@ -264,8 +266,10 @@ async def _wait_for_depletion(*, scheduler, queues): # Wait for the queues to be depleted, but only if there are some workers running. # Continue with the tasks termination if the timeout is reached, no matter the queues. started = time.perf_counter() - while queues and scheduler.active_count and time.perf_counter() - started < WorkersConfig.worker_exit_timeout: - await asyncio.sleep(WorkersConfig.worker_exit_timeout / 100.) + while queues and \ + scheduler.active_count and \ + time.perf_counter() - started < config.WorkersConfig.worker_exit_timeout: + await asyncio.sleep(config.WorkersConfig.worker_exit_timeout / 100.) # The last check if the termination is going to be graceful or not. if queues: diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index 43888c66..701c4e7e 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -132,8 +132,8 @@ async def test_event_batching(mocker, resource, handler, timer, stream, events, # Significantly less than the queue getting timeout, but sufficient to run. # 2 <= 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough. - from kopf.config import WorkersConfig - assert timer.seconds < WorkersConfig.worker_batch_window + CODE_OVERHEAD + from kopf import config + assert timer.seconds < config.WorkersConfig.worker_batch_window + CODE_OVERHEAD # Was the handler called at all? Awaited as needed for async fns? assert handler.awaited @@ -187,9 +187,9 @@ async def test_garbage_collection_of_queues(mocker, stream, events, unique, work # Give the workers some time to finish waiting for the events. # Once the idle timeout, they will exit and gc their individual queues. - from kopf.config import WorkersConfig - await asyncio.sleep(WorkersConfig.worker_batch_window) # depleting the queues. - await asyncio.sleep(WorkersConfig.worker_idle_timeout) # idling on empty queues. + from kopf import config + await asyncio.sleep(config.WorkersConfig.worker_batch_window) # depleting the queues. + await asyncio.sleep(config.WorkersConfig.worker_idle_timeout) # idling on empty queues. await asyncio.sleep(CODE_OVERHEAD) # The mutable(!) queues dict is now empty, i.e. garbage-collected. From bebaa18f3899a8e8e6379add9cbe554d9d436283 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 17 Jun 2019 13:53:21 +0300 Subject: [PATCH 05/17] fix loglevels --- kopf/config.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kopf/config.py b/kopf/config.py index 7c7a9ebb..2dc3485c 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -11,10 +11,10 @@ format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' -LOGLEVEL_INFO = logging.INFO -LOGLEVEL_WARNING = logging.WARNING -LOGLEVEL_ERROR = logging.ERROR -LOGLEVEL_CRITICAL = logging.CRITICAL +LOGLEVEL_INFO = 20 +LOGLEVEL_WARNING = 30 +LOGLEVEL_ERROR = 40 +LOGLEVEL_CRITICAL = 50 class LoginError(click.ClickException): From 1cc6951db04a9136851e01700768d47039bb2e6e Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Mon, 17 Jun 2019 13:57:56 +0300 Subject: [PATCH 06/17] fix event posting timeout --- kopf/events.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kopf/events.py b/kopf/events.py index 368ceb64..dd94f4fd 100644 --- a/kopf/events.py +++ b/kopf/events.py @@ -57,16 +57,16 @@ async def exception_async(obj, *, reason='', message='', exc=None): # Next 4 funcs are just synchronous interface for async event functions. def event(obj, *, type, reason, message=''): - asyncio.wait_for(event_async(obj, type=type, reason=reason, message=message), timeout=2) + asyncio.wait_for(event_async(obj, type=type, reason=reason, message=message), timeout=None) def info(obj, *, reason, message=''): - asyncio.wait_for(info_async(obj, reason=reason, message=message), timeout=2) + asyncio.wait_for(info_async(obj, reason=reason, message=message), timeout=None) def warn(obj, *, reason, message=''): - asyncio.wait_for(warn_async(obj, reason=reason, message=message), timeout=2) + asyncio.wait_for(warn_async(obj, reason=reason, message=message), timeout=None) def exception(obj, *, reason='', message='', exc=None): - asyncio.wait_for(exception_async(obj, reason=reason, message=message, exc=exc), timeout=2) + asyncio.wait_for(exception_async(obj, reason=reason, message=message, exc=exc), timeout=None) From 13eda0fea4f16611a09fc005b7ebd8f4d8b9f831 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Tue, 18 Jun 2019 11:32:58 +0300 Subject: [PATCH 07/17] fix patching tests --- tests/k8s/test_patching.py | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/tests/k8s/test_patching.py b/tests/k8s/test_patching.py index 6a7f17b0..e0838bdc 100644 --- a/tests/k8s/test_patching.py +++ b/tests/k8s/test_patching.py @@ -6,16 +6,13 @@ from kopf.k8s.patching import patch_obj -def test_by_name_clustered(client_mock, resource): +async def test_by_name_clustered(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object - task = asyncio.create_task(patch_obj(resource=resource, namespace=None, name='name1', patch=patch)) - loop = asyncio.get_event_loop() - loop.run_until_complete(task) - res = task.result + res = await patch_obj(resource=resource, namespace=None, name='name1', patch=patch) assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -29,16 +26,13 @@ def test_by_name_clustered(client_mock, resource): )] -def test_by_name_namespaced(client_mock, resource): +async def test_by_name_namespaced(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_cluster_custom_object mainfn_mock = apicls_mock.return_value.patch_namespaced_custom_object - task = asyncio.create_task(patch_obj(resource=resource, namespace='ns1', name='name1', patch=patch)) - loop = asyncio.get_event_loop() - loop.run_until_complete(task) - res = task.result + res = await patch_obj(resource=resource, namespace='ns1', name='name1', patch=patch) assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -53,17 +47,14 @@ def test_by_name_namespaced(client_mock, resource): )] -def test_by_body_clustered(client_mock, resource): +async def test_by_body_clustered(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object body = {'metadata': {'name': 'name1'}} - task = asyncio.create_task(patch_obj(resource=resource, body=body, patch=patch)) - loop = asyncio.get_event_loop() - loop.run_until_complete(task) - res = task.result + res = await patch_obj(resource=resource, body=body, patch=patch) assert res is None # never return any k8s-client specific things assert not sidefn_mock.called @@ -102,7 +93,7 @@ def test_by_body_namespaced(client_mock, resource): )] -def test_raises_when_body_conflicts_with_namespace(client_mock, resource): +async def test_raises_when_body_conflicts_with_namespace(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object @@ -110,14 +101,13 @@ def test_raises_when_body_conflicts_with_namespace(client_mock, resource): body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): - loop = asyncio.get_event_loop() - loop.run_until_complete(patch_obj(resource=resource, body=body, namespace='ns1', patch=patch)) + await patch_obj(resource=resource, body=body, namespace='ns1', patch=patch) assert not sidefn_mock.called assert not mainfn_mock.called -def test_raises_when_body_conflicts_with_name(client_mock, resource): +async def test_raises_when_body_conflicts_with_name(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object @@ -125,13 +115,13 @@ def test_raises_when_body_conflicts_with_name(client_mock, resource): body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): - patch_obj(resource=resource, body=body, name='name1', patch=patch) + await patch_obj(resource=resource, body=body, name='name1', patch=patch) assert not sidefn_mock.called assert not mainfn_mock.called -def test_raises_when_body_conflicts_with_ids(client_mock, resource): +async def test_raises_when_body_conflicts_with_ids(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object @@ -139,7 +129,7 @@ def test_raises_when_body_conflicts_with_ids(client_mock, resource): body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): - patch_obj(resource=resource, body=body, namespace='ns1', name='name1', patch=patch) + await patch_obj(resource=resource, body=body, namespace='ns1', name='name1', patch=patch) assert not sidefn_mock.called assert not mainfn_mock.called From 09ecbeeeda79fa7456fd8f84aeadb314b7983aaf Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Tue, 18 Jun 2019 11:35:09 +0300 Subject: [PATCH 08/17] try another way to retrieve parent thread eventloop --- kopf/k8s/events.py | 5 +---- kopf/k8s/patching.py | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/kopf/k8s/events.py b/kopf/k8s/events.py index 18d958f2..47499c4f 100644 --- a/kopf/k8s/events.py +++ b/kopf/k8s/events.py @@ -65,10 +65,7 @@ async def post_event(*, obj, type, reason, message=''): ) api = kubernetes.client.CoreV1Api() - loop = asyncio.get_event_loop() - if not loop: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop = asyncio.get_running_loop() try: await loop.run_in_executor( diff --git a/kopf/k8s/patching.py b/kopf/k8s/patching.py index 7f2c30a5..740fad96 100644 --- a/kopf/k8s/patching.py +++ b/kopf/k8s/patching.py @@ -39,9 +39,6 @@ async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): if namespace is not None: request_kwargs['namespace'] = namespace patch_func = api.patch_namespaced_custom_object - loop = asyncio.get_event_loop() - if not loop: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop = asyncio.get_running_loop() await loop.run_in_executor(patch_executor, functools.partial(patch_func, **request_kwargs)) From c1c4d9bf85b476aa5c6813f45860bdb9bd77eed9 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Tue, 18 Jun 2019 12:20:38 +0300 Subject: [PATCH 09/17] fix docstrings for kopf.config --- kopf/config.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/kopf/config.py b/kopf/config.py index 2dc3485c..b58bd89f 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -12,9 +12,13 @@ LOGLEVEL_INFO = 20 +""" Event loglevel to log all events. """ LOGLEVEL_WARNING = 30 +""" Event loglevel to log all events except informational. """ LOGLEVEL_ERROR = 40 +""" Event loglevel to log only errors and critical events. """ LOGLEVEL_CRITICAL = 50 +""" Event loglevel to log only critical events(basically - no events). """ class LoginError(click.ClickException): @@ -86,32 +90,36 @@ def configure(debug=None, verbose=None, quiet=None): class EventsConfig: - """Used to configure events sending behaviour""" + """ + Used to configure events sending behaviour. + """ - """What events should be logged""" events_loglevel = LOGLEVEL_INFO + """ What events should be logged. """ class WorkersConfig: - """Used as single point of configuration for kopf.reactor""" + """ + Used as single point of configuration for kopf.reactor. + """ - """How many workers can be running simultaneously on event creation operations""" synchronous_event_post_workers_limit = None + """ How many workers can be running simultaneously on event creation operations. """ - """How many workers can be running simultaneously on patch operations""" synchronous_patch_workers_limit = None + """ How many workers can be running simultaneously on patch operations. """ - """How many workers can be running simultaneously on per-object event queue""" queue_workers_limit = None # if None, there is no limits to workers number + """ How many workers can be running simultaneously on per-object event queue. """ - """How many threads in total can be running simultaneously to handle non-async handler functions""" synchronous_handlers_threadpool_limit = None # if None, calculated by ThreadPoolExecutor based on cpu count + """ How many threads in total can be running simultaneously to handle non-async handler functions. """ - """How long does a worker can idle before exiting and garbage-collecting.""" worker_idle_timeout = 5.0 + """ How long does a worker can idle before exiting and garbage-collecting.""" - """How fast/slow does a worker deplete the queue when an event is received.""" worker_batch_window = 0.1 + """ How fast/slow does a worker deplete the queue when an event is received.""" - """How long does a worker can work on watcher exit before being cancelled. """ worker_exit_timeout = 2.0 + """ How long does a worker can work on watcher exit before being cancelled. """ From 16a3a73e1e5c86de257d00be1192ff3acedb198e Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Tue, 18 Jun 2019 12:52:32 +0300 Subject: [PATCH 10/17] more fix for tests --- tests/k8s/test_events.py | 20 ++++++++++---------- tests/k8s/test_patching.py | 7 ++----- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/tests/k8s/test_events.py b/tests/k8s/test_events.py index 6c2f63ed..fe1b2d0f 100644 --- a/tests/k8s/test_events.py +++ b/tests/k8s/test_events.py @@ -4,7 +4,7 @@ from kopf.k8s.events import post_event -def test_posting(client_mock): +async def test_posting(client_mock): result = object() apicls_mock = client_mock.CoreV1Api apicls_mock.return_value.create_namespaced_event.return_value = result @@ -15,7 +15,7 @@ def test_posting(client_mock): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} - post_event(obj=obj, type='type', reason='reason', message='message') + await post_event(obj=obj, type='type', reason='reason', message='message') assert postfn_mock.called assert postfn_mock.call_count == 1 @@ -36,7 +36,7 @@ def test_posting(client_mock): assert event.involved_object['uid'] == 'uid' -def test_type_is_v1_not_v1beta1(client_mock): +async def test_type_is_v1_not_v1beta1(client_mock): apicls_mock = client_mock.CoreV1Api postfn_mock = apicls_mock.return_value.create_namespaced_event @@ -45,14 +45,14 @@ def test_type_is_v1_not_v1beta1(client_mock): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} - post_event(obj=obj, type='type', reason='reason', message='message') + await post_event(obj=obj, type='type', reason='reason', message='message') event = postfn_mock.call_args_list[0][1]['body'] assert isinstance(event, client_mock.V1Event) assert not isinstance(event, client_mock.V1beta1Event) -def test_api_errors_logged_but_suppressed(client_mock, assert_logs): +async def test_api_errors_logged_but_suppressed(client_mock, assert_logs): error = client_mock.rest.ApiException('boo!') apicls_mock = client_mock.CoreV1Api apicls_mock.return_value.create_namespaced_event.side_effect = error @@ -63,7 +63,7 @@ def test_api_errors_logged_but_suppressed(client_mock, assert_logs): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} - post_event(obj=obj, type='type', reason='reason', message='message') + await post_event(obj=obj, type='type', reason='reason', message='message') assert postfn_mock.called assert_logs([ @@ -71,7 +71,7 @@ def test_api_errors_logged_but_suppressed(client_mock, assert_logs): ]) -def test_regular_errors_escalate(client_mock): +async def test_regular_errors_escalate(client_mock): error = Exception('boo!') apicls_mock = client_mock.CoreV1Api apicls_mock.return_value.create_namespaced_event.side_effect = error @@ -83,12 +83,12 @@ def test_regular_errors_escalate(client_mock): 'uid': 'uid'}} with pytest.raises(Exception) as excinfo: - post_event(obj=obj, type='type', reason='reason', message='message') + await post_event(obj=obj, type='type', reason='reason', message='message') assert excinfo.value is error -def test_message_is_cut_to_max_length(client_mock): +async def test_message_is_cut_to_max_length(client_mock): result = object() apicls_mock = client_mock.CoreV1Api apicls_mock.return_value.create_namespaced_event.return_value = result @@ -100,7 +100,7 @@ def test_message_is_cut_to_max_length(client_mock): 'name': 'name', 'uid': 'uid'}} message = 'start' + ('x' * 2048) + 'end' - post_event(obj=obj, type='type', reason='reason', message=message) + await post_event(obj=obj, type='type', reason='reason', message=message) event = postfn_mock.call_args_list[0][1]['body'] assert len(event.message) <= 1024 # max supported API message length diff --git a/tests/k8s/test_patching.py b/tests/k8s/test_patching.py index e0838bdc..f3598843 100644 --- a/tests/k8s/test_patching.py +++ b/tests/k8s/test_patching.py @@ -68,17 +68,14 @@ async def test_by_body_clustered(client_mock, resource): )] -def test_by_body_namespaced(client_mock, resource): +async def test_by_body_namespaced(client_mock, resource): patch = object() apicls_mock = client_mock.CustomObjectsApi sidefn_mock = apicls_mock.return_value.patch_cluster_custom_object mainfn_mock = apicls_mock.return_value.patch_namespaced_custom_object body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} - task = asyncio.create_task(patch_obj(resource=resource, body=body, patch=patch)) - loop = asyncio.get_event_loop() - loop.run_until_complete(task) - res = task.result + res = await patch_obj(resource=resource, body=body, patch=patch) assert res is None # never return any k8s-client specific things assert not sidefn_mock.called From a071aa3ca0d2fad9f96fa90f459abf9518d56463 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Tue, 18 Jun 2019 13:48:23 +0300 Subject: [PATCH 11/17] commit to trigger pr bot --- kopf/reactor/queueing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index 36b8c0f3..045affc5 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -245,6 +245,7 @@ def run( cancelled, pending = loop.run_until_complete(asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)) assert not pending # must be empty by now, the tasks are either done or cancelled. else: + # when pending list is empty, let's say cancelled is empty too cancelled = [] # Check the results of the non-cancelled tasks, and re-raise of there were any exceptions. From 3ce9715cc5684fa2c7fa685b0fe6ce00131d0cbd Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Wed, 19 Jun 2019 12:43:54 +0300 Subject: [PATCH 12/17] really garbage commit to trigger ci --- Pipfile | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 Pipfile diff --git a/Pipfile b/Pipfile new file mode 100644 index 00000000..fac9def2 --- /dev/null +++ b/Pipfile @@ -0,0 +1,12 @@ +[[source]] +name = "pypi" +url = "https://pypi.org/simple" +verify_ssl = true + +[dev-packages] + +[packages] +kopf = {path = "."} + +[requires] +python_version = "3.7" From 0aee5ca210063ecb554885bd0ac40b84a7cc8734 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Wed, 19 Jun 2019 12:44:24 +0300 Subject: [PATCH 13/17] really garbage commit to trigger ci --- Pipfile | 12 ------------ 1 file changed, 12 deletions(-) delete mode 100644 Pipfile diff --git a/Pipfile b/Pipfile deleted file mode 100644 index fac9def2..00000000 --- a/Pipfile +++ /dev/null @@ -1,12 +0,0 @@ -[[source]] -name = "pypi" -url = "https://pypi.org/simple" -verify_ssl = true - -[dev-packages] - -[packages] -kopf = {path = "."} - -[requires] -python_version = "3.7" From 031da6794ce5f6ee61d26db7e9057564ae85bc29 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Thu, 20 Jun 2019 12:53:05 +0300 Subject: [PATCH 14/17] update docs on configuration --- docs/configuring.rst | 60 ++++++++++++++++++++++++++++++++++++++++++++ docs/index.rst | 1 + kopf/config.py | 4 +-- kopf/k8s/events.py | 2 +- kopf/k8s/patching.py | 2 +- 5 files changed, 65 insertions(+), 4 deletions(-) create mode 100644 docs/configuring.rst diff --git a/docs/configuring.rst b/docs/configuring.rst new file mode 100644 index 00000000..1127121d --- /dev/null +++ b/docs/configuring.rst @@ -0,0 +1,60 @@ +================ +Configuration +================ + +There are tools to configure some of kopf functionality, like asynchronous +tasks behaviour and logging events. + +.. note:: + All configuration should be done before importing kopf modules. + + +Configure logging events +================= + +`kopf.config.EventsConfig` allows to set what types of kopf logs should be +reflected in events. + +Loglevels are: + +* ``kopf.config.LOGLEVEL_INFO`` +* ``kopf.config.LOGLEVEL_WARNING`` +* ``kopf.config.LOGLEVEL_ERROR`` +* ``kopf.config.LOGLEVEL_CRITICAL`` + +.. code-block:: python + :caption: test_example_operator.py + + from kopf import config + + # Now kopf will send events only when error or critical occasion happens + config.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR + + import kopf + + @kopf.on.create('zalando.org', 'v1', 'ephemeralvolumeclaims') + def create_fn(body, **kwargs): + print(f"A handler is called with body: {body}") + + +Configure Workers +================= + +`kopf.config.WorkersConfig` allows to set numbers of workers, launch periods, +and timeouts for many kinds of tasks. + +.. code-block:: python + :caption: test_example_operator.py + + from kopf import config + + # Let's set how many workers can be running simultaneously on per-object event queue + config.WorkersConfig.queue_workers_limit = 10 + + import kopf + + @kopf.on.create('zalando.org', 'v1', 'ephemeralvolumeclaims') + def create_fn(body, **kwargs): + print(f"A handler is called with body: {body}") + + diff --git a/docs/index.rst b/docs/index.rst index 90c34be5..cc8edab6 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -36,6 +36,7 @@ Kopf: Kubernetes Operators Framework errors events testing + configuring .. toctree:: :maxdepth: 2 diff --git a/kopf/config.py b/kopf/config.py index b58bd89f..e921c69b 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -103,10 +103,10 @@ class WorkersConfig: Used as single point of configuration for kopf.reactor. """ - synchronous_event_post_workers_limit = None + synchronous_event_post_threadpool_limit = None """ How many workers can be running simultaneously on event creation operations. """ - synchronous_patch_workers_limit = None + synchronous_patch_threadpool_limit = None """ How many workers can be running simultaneously on patch operations. """ queue_workers_limit = None # if None, there is no limits to workers number diff --git a/kopf/k8s/events.py b/kopf/k8s/events.py index 47499c4f..90d8e814 100644 --- a/kopf/k8s/events.py +++ b/kopf/k8s/events.py @@ -14,7 +14,7 @@ CUT_MESSAGE_INFIX = '...' -event_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_event_post_workers_limit) +event_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_event_post_threadpool_limit) async def post_event(*, obj, type, reason, message=''): diff --git a/kopf/k8s/patching.py b/kopf/k8s/patching.py index 740fad96..5fdbd59a 100644 --- a/kopf/k8s/patching.py +++ b/kopf/k8s/patching.py @@ -6,7 +6,7 @@ from kopf import config -patch_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_patch_workers_limit) +patch_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_patch_threadpool_limit) async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): From 9e52ea604b9aba152adafc623abe2e9976487c5e Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Thu, 20 Jun 2019 17:08:26 +0300 Subject: [PATCH 15/17] fix configuration - now really can be configured even after kopf is imported --- docs/configuring.rst | 29 +++++------------------------ kopf/__init__.py | 6 ++++++ kopf/config.py | 38 +++++++++++++++++++++++++++----------- kopf/k8s/events.py | 7 ++----- kopf/k8s/patching.py | 5 +---- kopf/reactor/invocation.py | 10 +--------- 6 files changed, 42 insertions(+), 53 deletions(-) diff --git a/docs/configuring.rst b/docs/configuring.rst index 1127121d..a8695b74 100644 --- a/docs/configuring.rst +++ b/docs/configuring.rst @@ -5,12 +5,9 @@ Configuration There are tools to configure some of kopf functionality, like asynchronous tasks behaviour and logging events. -.. note:: - All configuration should be done before importing kopf modules. - Configure logging events -================= +======================== `kopf.config.EventsConfig` allows to set what types of kopf logs should be reflected in events. @@ -23,19 +20,11 @@ Loglevels are: * ``kopf.config.LOGLEVEL_CRITICAL`` .. code-block:: python - :caption: test_example_operator.py - - from kopf import config - - # Now kopf will send events only when error or critical occasion happens - config.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR import kopf - @kopf.on.create('zalando.org', 'v1', 'ephemeralvolumeclaims') - def create_fn(body, **kwargs): - print(f"A handler is called with body: {body}") - + # Now kopf will send events only when error or critical occasion happens + kopf.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR Configure Workers ================= @@ -44,17 +33,9 @@ Configure Workers and timeouts for many kinds of tasks. .. code-block:: python - :caption: test_example_operator.py - - from kopf import config - - # Let's set how many workers can be running simultaneously on per-object event queue - config.WorkersConfig.queue_workers_limit = 10 import kopf - @kopf.on.create('zalando.org', 'v1', 'ephemeralvolumeclaims') - def create_fn(body, **kwargs): - print(f"A handler is called with body: {body}") - + # Let's set how many workers can be running simultaneously on per-object event queue + kopf.WorkersConfig.synchronous_tasks_threadpool_limit = 20 diff --git a/kopf/__init__.py b/kopf/__init__.py index f7c6e219..e35b75b5 100644 --- a/kopf/__init__.py +++ b/kopf/__init__.py @@ -12,6 +12,12 @@ from kopf.config import ( login, configure, + LOGLEVEL_INFO, + LOGLEVEL_WARNING, + LOGLEVEL_ERROR, + LOGLEVEL_CRITICAL, + EventsConfig, + WorkersConfig ) from kopf.events import ( event, diff --git a/kopf/config.py b/kopf/config.py index e921c69b..3666ab02 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -1,6 +1,8 @@ import asyncio +import concurrent.futures import logging +from typing import Optional import click import kubernetes @@ -103,23 +105,37 @@ class WorkersConfig: Used as single point of configuration for kopf.reactor. """ - synchronous_event_post_threadpool_limit = None - """ How many workers can be running simultaneously on event creation operations. """ + threadpool_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None - synchronous_patch_threadpool_limit = None - """ How many workers can be running simultaneously on patch operations. """ - - queue_workers_limit = None # if None, there is no limits to workers number + queue_workers_limit: Optional[int] = None # if None, there is no limits to workers number """ How many workers can be running simultaneously on per-object event queue. """ - synchronous_handlers_threadpool_limit = None # if None, calculated by ThreadPoolExecutor based on cpu count - """ How many threads in total can be running simultaneously to handle non-async handler functions. """ + synchronous_tasks_threadpool_limit: Optional[int] = None # if None, calculated by ThreadPoolExecutor based on cpu count + """ How many threads in total can be running simultaneously to handle any non-async tasks. """ - worker_idle_timeout = 5.0 + worker_idle_timeout: float = 5.0 """ How long does a worker can idle before exiting and garbage-collecting.""" - worker_batch_window = 0.1 + worker_batch_window: float = 0.1 """ How fast/slow does a worker deplete the queue when an event is received.""" - worker_exit_timeout = 2.0 + worker_exit_timeout: float = 2.0 """ How long does a worker can work on watcher exit before being cancelled. """ + + @staticmethod + def get_syn_executor() -> concurrent.futures.ThreadPoolExecutor: + if not WorkersConfig.threadpool_executor: + logging.debug('Setting up syn executor') + WorkersConfig.threadpool_executor = concurrent.futures.ThreadPoolExecutor( + max_workers=WorkersConfig.synchronous_tasks_threadpool_limit + ) + return WorkersConfig.threadpool_executor + + @staticmethod + def set_synchronous_tasks_threadpool_limit(new_limit: int): + if new_limit < 1: + return + + WorkersConfig.synchronous_tasks_threadpool_limit = new_limit + if WorkersConfig.threadpool_executor: + WorkersConfig.threadpool_executor._max_workers = new_limit diff --git a/kopf/k8s/events.py b/kopf/k8s/events.py index 90d8e814..9cc47b18 100644 --- a/kopf/k8s/events.py +++ b/kopf/k8s/events.py @@ -1,5 +1,4 @@ import asyncio -import concurrent.futures import datetime import functools import logging @@ -14,9 +13,6 @@ CUT_MESSAGE_INFIX = '...' -event_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_event_post_threadpool_limit) - - async def post_event(*, obj, type, reason, message=''): """ Issue an event for the object. @@ -69,7 +65,8 @@ async def post_event(*, obj, type, reason, message=''): try: await loop.run_in_executor( - event_executor, functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body}) + config.WorkersConfig.get_syn_executor(), + functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body}) ) except kubernetes.client.rest.ApiException as e: # Events are helpful but auxiliary, they should not fail the handling cycle. diff --git a/kopf/k8s/patching.py b/kopf/k8s/patching.py index 5fdbd59a..2432af3a 100644 --- a/kopf/k8s/patching.py +++ b/kopf/k8s/patching.py @@ -1,13 +1,10 @@ import asyncio -import concurrent.futures import functools import kubernetes from kopf import config -patch_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_patch_threadpool_limit) - async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): """ @@ -41,4 +38,4 @@ async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): patch_func = api.patch_namespaced_custom_object loop = asyncio.get_running_loop() - await loop.run_in_executor(patch_executor, functools.partial(patch_func, **request_kwargs)) + await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), functools.partial(patch_func, **request_kwargs)) diff --git a/kopf/reactor/invocation.py b/kopf/reactor/invocation.py index d2a30535..642039ab 100644 --- a/kopf/reactor/invocation.py +++ b/kopf/reactor/invocation.py @@ -6,18 +6,12 @@ All of this goes via the same invocation logic and protocol. """ import asyncio -import concurrent.futures import contextvars import functools from typing import Callable -# The executor for the sync-handlers (i.e. regular functions). -# TODO: make the limits if sync-handlers configurable? from kopf import config -executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_handlers_threadpool_limit) -# executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) - async def invoke( fn: Callable, @@ -82,9 +76,7 @@ async def invoke( real_fn = functools.partial(context.run, real_fn) loop = asyncio.get_event_loop() - task = loop.run_in_executor(executor, real_fn) - await asyncio.wait([task]) - result = task.result() # re-raises + result = await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), real_fn) return result From af3e1026c16ec2b8e5064ea708762d26219f7072 Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Thu, 20 Jun 2019 17:28:18 +0300 Subject: [PATCH 16/17] fix behaviour in case of incorrect config --- kopf/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kopf/config.py b/kopf/config.py index 3666ab02..7d435e89 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -134,7 +134,7 @@ def get_syn_executor() -> concurrent.futures.ThreadPoolExecutor: @staticmethod def set_synchronous_tasks_threadpool_limit(new_limit: int): if new_limit < 1: - return + raise ValueError('Can`t set threadpool limit lower than 1') WorkersConfig.synchronous_tasks_threadpool_limit = new_limit if WorkersConfig.threadpool_executor: From ba34911760f10537d8b5439e6392124ba99bc96b Mon Sep 17 00:00:00 2001 From: Dmitry Bazhal Date: Thu, 20 Jun 2019 17:31:48 +0300 Subject: [PATCH 17/17] docstring --- kopf/config.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kopf/config.py b/kopf/config.py index 7d435e89..bebe2ca6 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -111,7 +111,7 @@ class WorkersConfig: """ How many workers can be running simultaneously on per-object event queue. """ synchronous_tasks_threadpool_limit: Optional[int] = None # if None, calculated by ThreadPoolExecutor based on cpu count - """ How many threads in total can be running simultaneously to handle any non-async tasks. """ + """ How many threads in total can be running simultaneously to handle any non-async tasks.""" worker_idle_timeout: float = 5.0 """ How long does a worker can idle before exiting and garbage-collecting.""" @@ -133,6 +133,9 @@ def get_syn_executor() -> concurrent.futures.ThreadPoolExecutor: @staticmethod def set_synchronous_tasks_threadpool_limit(new_limit: int): + """ + Call this static method at any time to change synchronous_tasks_threadpool_limit in runtime. + """ if new_limit < 1: raise ValueError('Can`t set threadpool limit lower than 1')