Skip to content

Commit

Permalink
fix configuration - now really can be configured even after kopf is i…
Browse files Browse the repository at this point in the history
…mported
  • Loading branch information
Dmitry Bazhal committed Jun 20, 2019
1 parent 031da67 commit 9e52ea6
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 53 deletions.
29 changes: 5 additions & 24 deletions docs/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ Configuration
There are tools to configure some of kopf functionality, like asynchronous
tasks behaviour and logging events.

.. note::
All configuration should be done before importing kopf modules.


Configure logging events
=================
========================

`kopf.config.EventsConfig` allows to set what types of kopf logs should be
reflected in events.
Expand All @@ -23,19 +20,11 @@ Loglevels are:
* ``kopf.config.LOGLEVEL_CRITICAL``

.. code-block:: python
:caption: test_example_operator.py
from kopf import config
# Now kopf will send events only when error or critical occasion happens
config.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR
import kopf
@kopf.on.create('zalando.org', 'v1', 'ephemeralvolumeclaims')
def create_fn(body, **kwargs):
print(f"A handler is called with body: {body}")
# Now kopf will send events only when error or critical occasion happens
kopf.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR
Configure Workers
=================
Expand All @@ -44,17 +33,9 @@ Configure Workers
and timeouts for many kinds of tasks.

.. code-block:: python
:caption: test_example_operator.py
from kopf import config
# Let's set how many workers can be running simultaneously on per-object event queue
config.WorkersConfig.queue_workers_limit = 10
import kopf
@kopf.on.create('zalando.org', 'v1', 'ephemeralvolumeclaims')
def create_fn(body, **kwargs):
print(f"A handler is called with body: {body}")
# Let's set how many workers can be running simultaneously on per-object event queue
kopf.WorkersConfig.synchronous_tasks_threadpool_limit = 20
6 changes: 6 additions & 0 deletions kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from kopf.config import (
login,
configure,
LOGLEVEL_INFO,
LOGLEVEL_WARNING,
LOGLEVEL_ERROR,
LOGLEVEL_CRITICAL,
EventsConfig,
WorkersConfig
)
from kopf.events import (
event,
Expand Down
38 changes: 27 additions & 11 deletions kopf/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

import asyncio
import concurrent.futures
import logging
from typing import Optional

import click
import kubernetes
Expand Down Expand Up @@ -103,23 +105,37 @@ class WorkersConfig:
Used as single point of configuration for kopf.reactor.
"""

synchronous_event_post_threadpool_limit = None
""" How many workers can be running simultaneously on event creation operations. """
threadpool_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None

synchronous_patch_threadpool_limit = None
""" How many workers can be running simultaneously on patch operations. """

queue_workers_limit = None # if None, there is no limits to workers number
queue_workers_limit: Optional[int] = None # if None, there is no limits to workers number
""" How many workers can be running simultaneously on per-object event queue. """

synchronous_handlers_threadpool_limit = None # if None, calculated by ThreadPoolExecutor based on cpu count
""" How many threads in total can be running simultaneously to handle non-async handler functions. """
synchronous_tasks_threadpool_limit: Optional[int] = None # if None, calculated by ThreadPoolExecutor based on cpu count
""" How many threads in total can be running simultaneously to handle any non-async tasks. """

worker_idle_timeout = 5.0
worker_idle_timeout: float = 5.0
""" How long does a worker can idle before exiting and garbage-collecting."""

worker_batch_window = 0.1
worker_batch_window: float = 0.1
""" How fast/slow does a worker deplete the queue when an event is received."""

worker_exit_timeout = 2.0
worker_exit_timeout: float = 2.0
""" How long does a worker can work on watcher exit before being cancelled. """

@staticmethod
def get_syn_executor() -> concurrent.futures.ThreadPoolExecutor:
if not WorkersConfig.threadpool_executor:
logging.debug('Setting up syn executor')
WorkersConfig.threadpool_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=WorkersConfig.synchronous_tasks_threadpool_limit
)
return WorkersConfig.threadpool_executor

@staticmethod
def set_synchronous_tasks_threadpool_limit(new_limit: int):
if new_limit < 1:
return

WorkersConfig.synchronous_tasks_threadpool_limit = new_limit
if WorkersConfig.threadpool_executor:
WorkersConfig.threadpool_executor._max_workers = new_limit
7 changes: 2 additions & 5 deletions kopf/k8s/events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import concurrent.futures
import datetime
import functools
import logging
Expand All @@ -14,9 +13,6 @@
CUT_MESSAGE_INFIX = '...'


event_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_event_post_threadpool_limit)


async def post_event(*, obj, type, reason, message=''):
"""
Issue an event for the object.
Expand Down Expand Up @@ -69,7 +65,8 @@ async def post_event(*, obj, type, reason, message=''):

try:
await loop.run_in_executor(
event_executor, functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body})
config.WorkersConfig.get_syn_executor(),
functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body})
)
except kubernetes.client.rest.ApiException as e:
# Events are helpful but auxiliary, they should not fail the handling cycle.
Expand Down
5 changes: 1 addition & 4 deletions kopf/k8s/patching.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import asyncio
import concurrent.futures
import functools

import kubernetes

from kopf import config

patch_executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_patch_threadpool_limit)


async def patch_obj(*, resource, patch, namespace=None, name=None, body=None):
"""
Expand Down Expand Up @@ -41,4 +38,4 @@ async def patch_obj(*, resource, patch, namespace=None, name=None, body=None):
patch_func = api.patch_namespaced_custom_object
loop = asyncio.get_running_loop()

await loop.run_in_executor(patch_executor, functools.partial(patch_func, **request_kwargs))
await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), functools.partial(patch_func, **request_kwargs))
10 changes: 1 addition & 9 deletions kopf/reactor/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,12 @@
All of this goes via the same invocation logic and protocol.
"""
import asyncio
import concurrent.futures
import contextvars
import functools
from typing import Callable

# The executor for the sync-handlers (i.e. regular functions).
# TODO: make the limits if sync-handlers configurable?
from kopf import config

executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.WorkersConfig.synchronous_handlers_threadpool_limit)
# executor = concurrent.futures.ProcessPoolExecutor(max_workers=3)


async def invoke(
fn: Callable,
Expand Down Expand Up @@ -82,9 +76,7 @@ async def invoke(
real_fn = functools.partial(context.run, real_fn)

loop = asyncio.get_event_loop()
task = loop.run_in_executor(executor, real_fn)
await asyncio.wait([task])
result = task.result() # re-raises
result = await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), real_fn)
return result


Expand Down

0 comments on commit 9e52ea6

Please sign in to comment.