diff --git a/kopf/__init__.py b/kopf/__init__.py index 8eb568ec..d3c5f154 100644 --- a/kopf/__init__.py +++ b/kopf/__init__.py @@ -22,7 +22,7 @@ EventsConfig, WorkersConfig ) -from kopf.events import ( +from kopf.engines.posting import ( event, info, warn, diff --git a/kopf/clients/events.py b/kopf/clients/events.py index 9cc47b18..5554f67b 100644 --- a/kopf/clients/events.py +++ b/kopf/clients/events.py @@ -6,6 +6,7 @@ import kubernetes.client.rest from kopf import config +from kopf.structs import hierarchies logger = logging.getLogger(__name__) @@ -13,13 +14,25 @@ CUT_MESSAGE_INFIX = '...' -async def post_event(*, obj, type, reason, message=''): +async def post_event(*, obj=None, ref=None, type, reason, message=''): """ Issue an event for the object. + + This is where they can also be accumulated, aggregated, grouped, + and where the rate-limits should be maintained. It can (and should) + be done by the client library, as it is done in the Go client. """ + # Object reference - similar to the owner reference, but different. + if obj is not None and ref is not None: + raise TypeError("Only one of obj= and ref= is allowed for a posted event. Got both.") + if obj is None and ref is None: + raise TypeError("One of obj= and ref= is required for a posted event. Got none.") + if ref is None: + ref = hierarchies.build_object_reference(obj) + now = datetime.datetime.utcnow() - namespace = obj['metadata']['namespace'] + namespace = ref['namespace'] or 'default' # Prevent a common case of event posting errors but shortening the message. if len(message) > MAX_MESSAGE_LENGTH: @@ -28,15 +41,6 @@ async def post_event(*, obj, type, reason, message=''): suffix = message[-MAX_MESSAGE_LENGTH // 2 + (len(infix) - len(infix) // 2):] message = f'{prefix}{infix}{suffix}' - # Object reference - similar to the owner reference, but different. - ref = dict( - apiVersion=obj['apiVersion'], - kind=obj['kind'], - name=obj['metadata']['name'], - uid=obj['metadata']['uid'], - namespace=obj['metadata']['namespace'], - ) - meta = kubernetes.client.V1ObjectMeta( namespace=namespace, generate_name='kopf-event-', diff --git a/kopf/engines/posting.py b/kopf/engines/posting.py new file mode 100644 index 00000000..1c476cd9 --- /dev/null +++ b/kopf/engines/posting.py @@ -0,0 +1,88 @@ +""" +All the functions to write the Kubernetes events for the Kubernetes objects. + +They are used internally in the handling routines to show the progress, +and can be used directly from the handlers to add arbitrary custom events. + +The actual k8s-event posting runs in the background, +and posts the k8s-events as soon as they are queued. +""" +import asyncio +import sys +from contextvars import ContextVar +from typing import Mapping, Text, NamedTuple + +from kopf import config +from kopf.clients import events +from kopf.structs import dicts +from kopf.structs import hierarchies + +event_queue_var: ContextVar[asyncio.Queue] = ContextVar('event_queue_var') + + +class K8sEvent(NamedTuple): + """ + A single k8s-event to be posted, with all ref-information preserved. + It can exist and be posted even after the object is garbage-collected. + """ + ref: Mapping + type: Text + reason: Text + message: Text + + +def event(objs, *, type, reason, message=''): + queue = event_queue_var.get() + for obj in dicts.walk(objs): + ref = hierarchies.build_object_reference(obj) + event = K8sEvent(ref=ref, type=type, reason=reason, message=message) + queue.put_nowait(event) + + +def info(obj, *, reason, message=''): + if config.EventsConfig.events_loglevel > config.LOGLEVEL_INFO: + return + event(obj, type='Normal', reason=reason, message=message) + + +def warn(obj, *, reason, message=''): + if config.EventsConfig.events_loglevel > config.LOGLEVEL_WARNING: + return + event(obj, type='Warning', reason=reason, message=message) + + +def exception(obj, *, reason='', message='', exc=None): + if config.EventsConfig.events_loglevel > config.LOGLEVEL_ERROR: + return + if exc is None: + _, exc, _ = sys.exc_info() + reason = reason if reason else type(exc).__name__ + message = f'{message} {exc}' if message and exc else f'{exc}' if exc else f'{message}' + event(obj, type='Error', reason=reason, message=message) + + +async def poster( + event_queue: asyncio.Queue, +): + """ + Post events in the background as they are queued. + + When the events come from the logging system, they have + their reason, type, and other fields adjusted to meet Kubernetes's concepts. + + When the events are explicitly defined via `kopf.event` and similar calls, + they have these special fields defined already. + + In either case, we pass the queued events directly to the K8s client + (or a client wrapper/adapter), with no extra processing. + + This task is defined in this module only because all other tasks are here, + so we keep all forever-running tasks together. + """ + while True: + posted_event = await event_queue.get() + await events.post_event( + ref=posted_event.ref, + type=posted_event.type, + reason=posted_event.reason, + message=posted_event.message) diff --git a/kopf/events.py b/kopf/events.py index 0f3ab0c6..676070a8 100644 --- a/kopf/events.py +++ b/kopf/events.py @@ -1,72 +1,20 @@ """ -All the functions to write the Kubernetes events on the Kubernetes objects. - -They are used internally in the handling routine to show the progress, -and can be used directly from the handlers to add arbitrary custom events. - -The events look like this: - - kubectl describe -f myres.yaml - ... - TODO - +**THIS MODULE IS DEPRECATED AND WILL BE REMOVED.** """ -import asyncio -import sys - -from kopf import config -from kopf.clients import events - - -# TODO: rename it it kopf.log()? kopf.events.log()? kopf.events.warn()? -async def event_async(obj, *, type, reason, message=''): - """ - Issue an event for the object. - """ - if isinstance(obj, (list, tuple)): - for item in obj: - await events.post_event(obj=item, type=type, reason=reason, message=message) - else: - await events.post_event(obj=obj, type=type, reason=reason, message=message) - - -# Shortcuts for the only two officially documented event types as of now. -# However, any arbitrary strings can be used as an event type to the base function. -async def info_async(obj, *, reason, message=''): - if config.EventsConfig.events_loglevel > config.LOGLEVEL_INFO: - return - await event_async(obj, reason=reason, message=message, type='Normal') - - -async def warn_async(obj, *, reason, message=''): - if config.EventsConfig.events_loglevel > config.LOGLEVEL_WARNING: - return - await event_async(obj, reason=reason, message=message, type='Warning') - - -async def exception_async(obj, *, reason='', message='', exc=None): - if config.EventsConfig.events_loglevel > config.LOGLEVEL_ERROR: - return - - if exc is None: - _, exc, _ = sys.exc_info() - reason = reason if reason else type(exc).__name__ - message = f'{message} {exc}' if message else f'{exc}' - await event_async(obj, reason=reason, message=message, type='Error') - - -# Next 4 funcs are just synchronous interface for async event functions. -def event(obj, *, type, reason, message=''): - asyncio.wait_for(event_async(obj, type=type, reason=reason, message=message), timeout=None) - - -def info(obj, *, reason, message=''): - asyncio.wait_for(info_async(obj, reason=reason, message=message), timeout=None) +import warnings +from kopf.engines.posting import ( + event, + info, + warn, + exception, +) -def warn(obj, *, reason, message=''): - asyncio.wait_for(warn_async(obj, reason=reason, message=message), timeout=None) +__all__ = ['event', 'info', 'warn', 'exception'] -def exception(obj, *, reason='', message='', exc=None): - asyncio.wait_for(exception_async(obj, reason=reason, message=message, exc=exc), timeout=None) +# Triggered on explicit `import kopf.events` (not imported this way normally). +warnings.warn( + "`kopf.events` is deprecated; " + "use `kopf` directly: e.g. `kopf.event(...)`.", + DeprecationWarning, stacklevel=0) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 8c6631bc..e7a29f08 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -21,8 +21,8 @@ from contextvars import ContextVar from typing import Optional, Callable, Iterable, Union, Collection -from kopf import events from kopf.clients import patching +from kopf.engines import posting from kopf.reactor import causation from kopf.reactor import invocation from kopf.reactor import registries @@ -79,6 +79,7 @@ async def custom_object_handler( resource: registries.Resource, event: dict, freeze: asyncio.Event, + event_queue: asyncio.Queue, ) -> None: """ Handle a single custom object low-level watch-event. @@ -98,6 +99,7 @@ async def custom_object_handler( namespace=body.get('metadata', {}).get('namespace', 'default'), name=body.get('metadata', {}).get('name', body.get('metadata', {}).get('uid', None)), )) + posting.event_queue_var.set(event_queue) # till the end of this object's task. # If the global freeze is set for the processing (i.e. other operator overrides), do nothing. if freeze.is_set(): @@ -205,7 +207,7 @@ async def handle_cause( done = False else: logger.info(f"All handlers succeeded for {title}.") - await events.info_async(cause.body, reason='Success', message=f"All handlers succeeded for {title}.") + posting.info(cause.body, reason='Success', message=f"All handlers succeeded for {title}.") done = True else: skip = True @@ -383,14 +385,14 @@ async def _execute( # Definitely retriable error, no matter what is the error-reaction mode. except HandlerRetryError as e: logger.exception(f"Handler {handler.id!r} failed with a retry exception. Will retry.") - await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") + posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay) handlers_left.append(handler) # Definitely fatal error, no matter what is the error-reaction mode. except HandlerFatalError as e: logger.exception(f"Handler {handler.id!r} failed with a fatal exception. Will stop.") - await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") + posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? @@ -398,19 +400,19 @@ async def _execute( except Exception as e: if retry_on_errors: logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.") - await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") + posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY) handlers_left.append(handler) else: logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.") - await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") + posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? # No errors means the handler should be excluded from future runs in this reaction cycle. else: logger.info(f"Handler {handler.id!r} succeeded.") - await events.info_async(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.") + posting.info(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.") status.store_success(body=cause.body, patch=cause.patch, handler=handler, result=result) # Provoke the retry of the handling cycle if there were any unfinished handlers, diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index 900dee06..3bfc0996 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -34,6 +34,7 @@ from kopf import config from kopf.clients import watching from kopf.engines import peering +from kopf.engines import posting from kopf.reactor import handling from kopf.reactor import lifecycles from kopf.reactor import registries @@ -174,9 +175,17 @@ def create_tasks( # The freezer and the registry are scoped to this whole task-set, to sync them all. lifecycle = lifecycle if lifecycle is not None else lifecycles.get_default_lifecycle() registry = registry if registry is not None else registries.get_default_registry() + event_queue = asyncio.Queue() freeze = asyncio.Event() tasks = [] + # K8s-event posting. Events are queued in-memory and posted in the background. + # NB: currently, it is a global task, but can be made per-resource or per-object. + tasks.extend([ + loop.create_task(posting.poster( + event_queue=event_queue)), + ]) + # Monitor the peers, unless explicitly disabled. ourselves: Optional[peering.Peer] = peering.Peer.detect( id=peering.detect_own_id(), priority=priority, @@ -204,6 +213,7 @@ def create_tasks( lifecycle=lifecycle, registry=registry, resource=resource, + event_queue=event_queue, freeze=freeze))), # freeze is only checked ]) diff --git a/tests/handling/test_cause_handling.py b/tests/handling/test_cause_handling.py index 4f907bf4..c99152ce 100644 --- a/tests/handling/test_cause_handling.py +++ b/tests/handling/test_cause_handling.py @@ -13,12 +13,14 @@ async def test_new(registry, handlers, resource, cause_mock, caplog.set_level(logging.DEBUG) cause_mock.event = NEW + event_queue = asyncio.Queue() await custom_object_handler( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=event_queue, ) assert not handlers.create_mock.called @@ -26,8 +28,8 @@ async def test_new(registry, handlers, resource, cause_mock, assert not handlers.delete_mock.called assert k8s_mocked.asyncio_sleep.call_count == 0 - assert k8s_mocked.post_event.call_count == 0 assert k8s_mocked.patch_obj.call_count == 1 + assert event_queue.empty() patch = k8s_mocked.patch_obj.call_args_list[0][1]['patch'] assert 'metadata' in patch @@ -46,12 +48,14 @@ async def test_create(registry, handlers, resource, cause_mock, caplog.set_level(logging.DEBUG) cause_mock.event = CREATE + event_queue = asyncio.Queue() await custom_object_handler( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=event_queue, ) assert handlers.create_mock.call_count == 1 @@ -59,8 +63,8 @@ async def test_create(registry, handlers, resource, cause_mock, assert not handlers.delete_mock.called assert k8s_mocked.asyncio_sleep.call_count == 0 - assert k8s_mocked.post_event.call_count >= 1 assert k8s_mocked.patch_obj.call_count == 1 + assert not event_queue.empty() patch = k8s_mocked.patch_obj.call_args_list[0][1]['patch'] assert 'metadata' in patch @@ -85,12 +89,14 @@ async def test_update(registry, handlers, resource, cause_mock, caplog.set_level(logging.DEBUG) cause_mock.event = UPDATE + event_queue = asyncio.Queue() await custom_object_handler( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=event_queue, ) assert not handlers.create_mock.called @@ -98,8 +104,8 @@ async def test_update(registry, handlers, resource, cause_mock, assert not handlers.delete_mock.called assert k8s_mocked.asyncio_sleep.call_count == 0 - assert k8s_mocked.post_event.call_count >= 1 assert k8s_mocked.patch_obj.call_count == 1 + assert not event_queue.empty() patch = k8s_mocked.patch_obj.call_args_list[0][1]['patch'] assert 'metadata' in patch @@ -124,12 +130,14 @@ async def test_delete(registry, handlers, resource, cause_mock, caplog.set_level(logging.DEBUG) cause_mock.event = DELETE + event_queue = asyncio.Queue() await custom_object_handler( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=event_queue, ) assert not handlers.create_mock.called @@ -137,8 +145,8 @@ async def test_delete(registry, handlers, resource, cause_mock, assert handlers.delete_mock.call_count == 1 assert k8s_mocked.asyncio_sleep.call_count == 0 - assert k8s_mocked.post_event.call_count >= 1 assert k8s_mocked.patch_obj.call_count == 1 + assert not event_queue.empty() patch = k8s_mocked.patch_obj.call_args_list[0][1]['patch'] assert 'status' in patch @@ -165,12 +173,14 @@ async def test_gone(registry, handlers, resource, cause_mock, caplog.set_level(logging.DEBUG) cause_mock.event = GONE + event_queue = asyncio.Queue() await custom_object_handler( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=event_queue, ) assert not handlers.create_mock.called @@ -178,8 +188,8 @@ async def test_gone(registry, handlers, resource, cause_mock, assert not handlers.delete_mock.called assert not k8s_mocked.asyncio_sleep.called - assert not k8s_mocked.post_event.called assert not k8s_mocked.patch_obj.called + assert event_queue.empty() assert_logs([ "Deleted, really deleted", @@ -191,12 +201,14 @@ async def test_free(registry, handlers, resource, cause_mock, caplog.set_level(logging.DEBUG) cause_mock.event = FREE + event_queue = asyncio.Queue() await custom_object_handler( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=event_queue, ) assert not handlers.create_mock.called @@ -204,8 +216,8 @@ async def test_free(registry, handlers, resource, cause_mock, assert not handlers.delete_mock.called assert not k8s_mocked.asyncio_sleep.called - assert not k8s_mocked.post_event.called assert not k8s_mocked.patch_obj.called + assert event_queue.empty() assert_logs([ "Deletion event, but we are done with it", @@ -217,12 +229,14 @@ async def test_noop(registry, handlers, resource, cause_mock, caplog.set_level(logging.DEBUG) cause_mock.event = NOOP + event_queue = asyncio.Queue() await custom_object_handler( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=event_queue, ) assert not handlers.create_mock.called @@ -230,8 +244,8 @@ async def test_noop(registry, handlers, resource, cause_mock, assert not handlers.delete_mock.called assert not k8s_mocked.asyncio_sleep.called - assert not k8s_mocked.post_event.called assert not k8s_mocked.patch_obj.called + assert event_queue.empty() assert_logs([ "Something has changed, but we are not interested", diff --git a/tests/handling/test_cause_logging.py b/tests/handling/test_cause_logging.py index a0f24611..b7647183 100644 --- a/tests/handling/test_cause_logging.py +++ b/tests/handling/test_cause_logging.py @@ -20,6 +20,7 @@ async def test_all_logs_are_prefixed(registry, resource, handlers, resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert caplog.messages # no messages means that we cannot test it assert all(message.startswith('[ns1/name1] ') for message in caplog.messages) @@ -43,6 +44,7 @@ async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert_logs([ " event: ", @@ -63,6 +65,7 @@ async def test_diffs_not_logged_if_absent(registry, resource, handlers, cause_ty resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert_logs([ " event: ", diff --git a/tests/handling/test_delays.py b/tests/handling/test_delays.py index b987098e..692b54d6 100644 --- a/tests/handling/test_delays.py +++ b/tests/handling/test_delays.py @@ -34,6 +34,7 @@ async def test_delayed_handlers_progress( resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert handlers.create_mock.call_count == (1 if cause_type == CREATE else 0) @@ -81,6 +82,7 @@ async def test_delayed_handlers_sleep( resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert not handlers.create_mock.called diff --git a/tests/handling/test_errors.py b/tests/handling/test_errors.py index 21219070..3b35511e 100644 --- a/tests/handling/test_errors.py +++ b/tests/handling/test_errors.py @@ -29,6 +29,7 @@ async def test_fatal_error_stops_handler( resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert handlers.create_mock.call_count == (1 if cause_type == CREATE else 0) @@ -69,6 +70,7 @@ async def test_retry_error_delays_handler( resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert handlers.create_mock.call_count == (1 if cause_type == CREATE else 0) @@ -110,6 +112,7 @@ async def test_arbitrary_error_delays_handler( resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert handlers.create_mock.call_count == (1 if cause_type == CREATE else 0) diff --git a/tests/handling/test_event_handling.py b/tests/handling/test_event_handling.py index 100b1b47..f2327d76 100644 --- a/tests/handling/test_event_handling.py +++ b/tests/handling/test_event_handling.py @@ -21,6 +21,7 @@ async def test_handlers_called_always( resource=resource, event={'type': 'ev-type', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert handlers.event_mock.call_count == 1 @@ -54,6 +55,7 @@ async def test_errors_are_ignored( resource=resource, event={'type': 'ev-type', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert handlers.event_mock.called diff --git a/tests/handling/test_freezing.py b/tests/handling/test_freezing.py index 19228139..9502e09f 100644 --- a/tests/handling/test_freezing.py +++ b/tests/handling/test_freezing.py @@ -29,6 +29,7 @@ async def test_nothing_is_called_when_freeze_is_set(mocker, resource, caplog, as resource=resource, event=event, freeze=freeze, + event_queue=asyncio.Queue(), ) assert not detect_cause.called diff --git a/tests/handling/test_multistep.py b/tests/handling/test_multistep.py index 7269c625..b3ab0562 100644 --- a/tests/handling/test_multistep.py +++ b/tests/handling/test_multistep.py @@ -22,6 +22,7 @@ async def test_1st_step_stores_progress_by_patching( resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert handlers.create_mock.call_count == (1 if cause_type == CREATE else 0) @@ -66,6 +67,7 @@ async def test_2nd_step_finishes_the_handlers( resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert extrahandlers.create_mock.call_count == (1 if cause_type == CREATE else 0) diff --git a/tests/handling/test_no_handlers.py b/tests/handling/test_no_handlers.py index ce74de85..f015bd5c 100644 --- a/tests/handling/test_no_handlers.py +++ b/tests/handling/test_no_handlers.py @@ -32,6 +32,7 @@ async def test_skipped_with_no_handlers( resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert not k8s_mocked.asyncio_sleep.called diff --git a/tests/handling/test_timeouts.py b/tests/handling/test_timeouts.py index d51ad4b8..ae07a190 100644 --- a/tests/handling/test_timeouts.py +++ b/tests/handling/test_timeouts.py @@ -38,6 +38,7 @@ async def test_timed_out_handler_fails( resource=resource, event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), + event_queue=asyncio.Queue(), ) assert not handlers.create_mock.called diff --git a/tests/posting/test_poster.py b/tests/posting/test_poster.py new file mode 100644 index 00000000..b17d6ef9 --- /dev/null +++ b/tests/posting/test_poster.py @@ -0,0 +1,92 @@ +import asyncio + +import pytest +from asynctest import call + +from kopf import event, info, warn, exception +from kopf.engines.posting import poster, K8sEvent, event_queue_var + +OBJ1 = {'apiVersion': 'group1/version1', 'kind': 'Kind1', + 'metadata': {'uid': 'uid1', 'name': 'name1', 'namespace': 'ns1'}} +REF1 = {'apiVersion': 'group1/version1', 'kind': 'Kind1', + 'uid': 'uid1', 'name': 'name1', 'namespace': 'ns1'} +OBJ2 = {'apiVersion': 'group2/version2', 'kind': 'Kind2', + 'metadata': {'uid': 'uid2', 'name': 'name2', 'namespace': 'ns2'}} +REF2 = {'apiVersion': 'group2/version2', 'kind': 'Kind2', + 'uid': 'uid2', 'name': 'name2', 'namespace': 'ns2'} + + +async def test_poster_polls_and_posts(mocker): + event1 = K8sEvent(type='type1', reason='reason1', message='message1', ref=REF1) + event2 = K8sEvent(type='type2', reason='reason2', message='message2', ref=REF2) + event_queue = asyncio.Queue() + event_queue.put_nowait(event1) + event_queue.put_nowait(event2) + + # A way to cancel `while True` cycle when we need it (ASAP). + def _cancel(*args, **kwargs): + if post_event.call_count >= 2: + raise asyncio.CancelledError() + post_event = mocker.patch('kopf.clients.events.post_event', side_effect=_cancel) + + # A way to cancel `whole True` cycle by timing, event if routines are not called. + with pytest.raises(asyncio.CancelledError): + await asyncio.wait_for( + poster(event_queue=event_queue), timeout=0.5) + + assert post_event.call_count == 2 + assert post_event.await_count == 2 + assert post_event.called_with( + call(ref=REF1, type='type1', reason='reason1', message='message1'), + call(ref=REF2, type='type2', reason='reason2', message='message2'), + ) + + +def test_queueing_fails_with_no_queue(mocker): + # Prerequisite: the context-var should not be set by anything in advance. + sentinel = object() + assert event_queue_var.get(sentinel) is sentinel + + with pytest.raises(LookupError): + event(OBJ1, type='type1', reason='reason1', message='message1') + + +def test_via_event_function(mocker): + post_event = mocker.patch('kopf.clients.events.post_event') + + event_queue = asyncio.Queue() + event_queue_var.set(event_queue) + event(OBJ1, type='type1', reason='reason1', message='message1') + + assert not post_event.called + assert event_queue.qsize() == 1 + event1 = event_queue.get_nowait() + + assert isinstance(event1, K8sEvent) + assert event1.ref == REF1 + assert event1.type == 'type1' + assert event1.reason == 'reason1' + assert event1.message == 'message1' + + +@pytest.mark.parametrize('event_fn, event_type', [ + pytest.param(info, "Normal", id='info'), + pytest.param(warn, "Warning", id='warn'), + pytest.param(exception, "Error", id='exception'), +]) +def test_via_shortcut(mocker, event_fn, event_type): + post_event = mocker.patch('kopf.clients.events.post_event') + + event_queue = asyncio.Queue() + event_queue_var.set(event_queue) + event_fn(OBJ1, reason='reason1', message='message1') + + assert not post_event.called + assert event_queue.qsize() == 1 + event1 = event_queue.get_nowait() + + assert isinstance(event1, K8sEvent) + assert event1.ref == REF1 + assert event1.type == event_type + assert event1.reason == 'reason1' + assert event1.message == 'message1'