Skip to content

Commit

Permalink
Merge pull request #109 from dbazhal/master
Browse files Browse the repository at this point in the history
Some concurrency fixes, some configurability
  • Loading branch information
Sergey Vasilyev authored Jun 21, 2019
2 parents 3a127e5 + ba34911 commit c759d68
Show file tree
Hide file tree
Showing 17 changed files with 256 additions and 110 deletions.
41 changes: 41 additions & 0 deletions docs/configuring.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
================
Configuration
================

There are tools to configure some of kopf functionality, like asynchronous
tasks behaviour and logging events.


Configure logging events
========================

`kopf.config.EventsConfig` allows to set what types of kopf logs should be
reflected in events.

Loglevels are:

* ``kopf.config.LOGLEVEL_INFO``
* ``kopf.config.LOGLEVEL_WARNING``
* ``kopf.config.LOGLEVEL_ERROR``
* ``kopf.config.LOGLEVEL_CRITICAL``

.. code-block:: python
import kopf
# Now kopf will send events only when error or critical occasion happens
kopf.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR
Configure Workers
=================

`kopf.config.WorkersConfig` allows to set numbers of workers, launch periods,
and timeouts for many kinds of tasks.

.. code-block:: python
import kopf
# Let's set how many workers can be running simultaneously on per-object event queue
kopf.WorkersConfig.synchronous_tasks_threadpool_limit = 20
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Kopf: Kubernetes Operators Framework
errors
events
testing
configuring

.. toctree::
:maxdepth: 2
Expand Down
1 change: 1 addition & 0 deletions examples/04-events/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def create_fn(body, **kwargs):
# The shortcuts for the conventional events and common cases.
kopf.info(body, reason="SomeReason", message="Some message")
kopf.warn(body, reason="SomeReason", message="Some message")

try:
raise RuntimeError("Exception text.")
except:
Expand Down
1 change: 0 additions & 1 deletion examples/99-all-at-once/example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Kubernetes operator example: all the features at once (for debugging & testing).
"""

import pprint
import time

Expand Down
6 changes: 6 additions & 0 deletions kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from kopf.config import (
login,
configure,
LOGLEVEL_INFO,
LOGLEVEL_WARNING,
LOGLEVEL_ERROR,
LOGLEVEL_CRITICAL,
EventsConfig,
WorkersConfig
)
from kopf.events import (
event,
Expand Down
7 changes: 5 additions & 2 deletions kopf/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import functools

import click
Expand Down Expand Up @@ -70,7 +71,8 @@ def freeze(id, message, lifetime, namespace, peering_name, priority):
priority=priority,
lifetime=lifetime,
)
ourserlves.keepalive()
loop = asyncio.get_event_loop()
loop.run_until_complete(ourserlves.keepalive())


@main.command()
Expand All @@ -86,4 +88,5 @@ def resume(id, namespace, peering_name):
name=peering_name,
namespace=namespace,
)
ourselves.disappear()
loop = asyncio.get_event_loop()
loop.run_until_complete(ourselves.disappear())
65 changes: 65 additions & 0 deletions kopf/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

import asyncio
import concurrent.futures
import logging
from typing import Optional

import click
import kubernetes
Expand All @@ -11,6 +13,16 @@
format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s'


LOGLEVEL_INFO = 20
""" Event loglevel to log all events. """
LOGLEVEL_WARNING = 30
""" Event loglevel to log all events except informational. """
LOGLEVEL_ERROR = 40
""" Event loglevel to log only errors and critical events. """
LOGLEVEL_CRITICAL = 50
""" Event loglevel to log only critical events(basically - no events). """


class LoginError(click.ClickException):
""" Raised when the operator cannot login to the API. """

Expand Down Expand Up @@ -77,3 +89,56 @@ def configure(debug=None, verbose=None, quiet=None):

loop = asyncio.get_event_loop()
loop.set_debug(debug)


class EventsConfig:
"""
Used to configure events sending behaviour.
"""

events_loglevel = LOGLEVEL_INFO
""" What events should be logged. """


class WorkersConfig:
"""
Used as single point of configuration for kopf.reactor.
"""

threadpool_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None

queue_workers_limit: Optional[int] = None # if None, there is no limits to workers number
""" How many workers can be running simultaneously on per-object event queue. """

synchronous_tasks_threadpool_limit: Optional[int] = None # if None, calculated by ThreadPoolExecutor based on cpu count
""" How many threads in total can be running simultaneously to handle any non-async tasks."""

worker_idle_timeout: float = 5.0
""" How long does a worker can idle before exiting and garbage-collecting."""

worker_batch_window: float = 0.1
""" How fast/slow does a worker deplete the queue when an event is received."""

worker_exit_timeout: float = 2.0
""" How long does a worker can work on watcher exit before being cancelled. """

@staticmethod
def get_syn_executor() -> concurrent.futures.ThreadPoolExecutor:
if not WorkersConfig.threadpool_executor:
logging.debug('Setting up syn executor')
WorkersConfig.threadpool_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=WorkersConfig.synchronous_tasks_threadpool_limit
)
return WorkersConfig.threadpool_executor

@staticmethod
def set_synchronous_tasks_threadpool_limit(new_limit: int):
"""
Call this static method at any time to change synchronous_tasks_threadpool_limit in runtime.
"""
if new_limit < 1:
raise ValueError('Can`t set threadpool limit lower than 1')

WorkersConfig.synchronous_tasks_threadpool_limit = new_limit
if WorkersConfig.threadpool_executor:
WorkersConfig.threadpool_executor._max_workers = new_limit
44 changes: 35 additions & 9 deletions kopf/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,62 @@
TODO
"""
import asyncio
import sys

from kopf import config
from kopf.k8s import events


# TODO: rename it it kopf.log()? kopf.events.log()? kopf.events.warn()?
def event(obj, *, type, reason, message=''):
async def event_async(obj, *, type, reason, message=''):
"""
Issue an event for the object.
"""
if isinstance(obj, (list, tuple)):
for item in obj:
events.post_event(obj=item, type=type, reason=reason, message=message)
await events.post_event(obj=item, type=type, reason=reason, message=message)
else:
events.post_event(obj=obj, type=type, reason=reason, message=message)
await events.post_event(obj=obj, type=type, reason=reason, message=message)


# Shortcuts for the only two officially documented event types as of now.
# However, any arbitrary strings can be used as an event type to the base function.
def info(obj, *, reason, message=''):
return event(obj, reason=reason, message=message, type='Normal')
async def info_async(obj, *, reason, message=''):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_INFO:
return
await event_async(obj, reason=reason, message=message, type='Normal')


def warn(obj, *, reason, message=''):
return event(obj, reason=reason, message=message, type='Warning')
async def warn_async(obj, *, reason, message=''):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_WARNING:
return
await event_async(obj, reason=reason, message=message, type='Warning')


def exception(obj, *, reason='', message='', exc=None):
async def exception_async(obj, *, reason='', message='', exc=None):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_ERROR:
return

if exc is None:
_, exc, _ = sys.exc_info()
reason = reason if reason else type(exc).__name__
message = f'{message} {exc}' if message else f'{exc}'
return event(obj, reason=reason, message=message, type='Error')
await event_async(obj, reason=reason, message=message, type='Error')


# Next 4 funcs are just synchronous interface for async event functions.
def event(obj, *, type, reason, message=''):
asyncio.wait_for(event_async(obj, type=type, reason=reason, message=message), timeout=None)


def info(obj, *, reason, message=''):
asyncio.wait_for(info_async(obj, reason=reason, message=message), timeout=None)


def warn(obj, *, reason, message=''):
asyncio.wait_for(warn_async(obj, reason=reason, message=message), timeout=None)


def exception(obj, *, reason='', message='', exc=None):
asyncio.wait_for(exception_async(obj, reason=reason, message=message, exc=exc), timeout=None)
16 changes: 11 additions & 5 deletions kopf/k8s/events.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import asyncio
import datetime
import functools
import logging

import kubernetes.client.rest

from kopf import config

logger = logging.getLogger(__name__)

MAX_MESSAGE_LENGTH = 1024
CUT_MESSAGE_INFIX = '...'


def post_event(*, obj, type, reason, message=''):
async def post_event(*, obj, type, reason, message=''):
"""
Issue an event for the object.
"""
Expand Down Expand Up @@ -56,11 +60,13 @@ def post_event(*, obj, type, reason, message=''):
event_time=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z'
)

api = kubernetes.client.CoreV1Api()
loop = asyncio.get_running_loop()

try:
api = kubernetes.client.CoreV1Api()
api.create_namespaced_event(
namespace=namespace,
body=body,
await loop.run_in_executor(
config.WorkersConfig.get_syn_executor(),
functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body})
)
except kubernetes.client.rest.ApiException as e:
# Events are helpful but auxiliary, they should not fail the handling cycle.
Expand Down
38 changes: 20 additions & 18 deletions kopf/k8s/patching.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import asyncio
import functools

import kubernetes

from kopf import config


def patch_obj(*, resource, patch, namespace=None, name=None, body=None):
async def patch_obj(*, resource, patch, namespace=None, name=None, body=None):
"""
Patch a resource of specific kind.
Expand All @@ -20,20 +25,17 @@ def patch_obj(*, resource, patch, namespace=None, name=None, body=None):
name = body.get('metadata', {}).get('name') if body is not None else name

api = kubernetes.client.CustomObjectsApi()
if namespace is None:
api.patch_cluster_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
name=name,
body=patch,
)
else:
api.patch_namespaced_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
namespace=namespace,
name=name,
body=patch,
)
request_kwargs = {
'group': resource.group,
'version': resource.version,
'plural': resource.plural,
'name': name,
'body': patch
}
patch_func = api.patch_cluster_custom_object
if namespace is not None:
request_kwargs['namespace'] = namespace
patch_func = api.patch_namespaced_custom_object
loop = asyncio.get_running_loop()

await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), functools.partial(patch_func, **request_kwargs))
Loading

0 comments on commit c759d68

Please sign in to comment.