Skip to content

Commit

Permalink
Merge pull request #933 from nolar/credentials-expiration
Browse files Browse the repository at this point in the history
Define credentials expiration time (optionally)
  • Loading branch information
nolar authored Jul 11, 2022
2 parents d07aebc + 3937b7c commit 2551b8b
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 11 deletions.
12 changes: 12 additions & 0 deletions docs/authentication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ To implement a custom authentication method, one or a few login-handlers
can be added. The login handlers should either return nothing (``None``)
or an instance of :class:`kopf.ConnectionInfo`::

import datetime
import kopf

@kopf.on.login()
Expand All @@ -50,6 +51,7 @@ or an instance of :class:`kopf.ConnectionInfo`::
private_key_path='~/.minikube/client.key',
certificate_data=b'...',
private_key_data=b'...',
expiration=datetime.datetime(2099, 12, 31, 23, 59, 59),
)

As with any other handlers, the login handler can be async if the network
Expand Down Expand Up @@ -160,6 +162,7 @@ Internally, all the credentials are gathered from all the active handlers
in no particular order, and are fed into a *vault*.

The Kubernetes API calls then use random credentials from that *vault*.
The credentials that have reached their expiration are ignored and removed.
If the API call fails with an HTTP 401 error, these credentials are marked
invalid, excluded from further use, and the next random credentials are tried.

Expand All @@ -178,3 +181,12 @@ by the login handlers, the API calls fail, and so does the operator.

This internal logic is hidden from the operator developers, but it is worth
knowing how it works internally. See :class:`Vault`.

If the expiration is intended to be often (e.g. every few minutes),
you might want to disable the logging of re-authenication (whether this is
a good idea or not, you decide using the information about your system)::

import logging

logging.getLogger('kopf.activities.authentication').disabled = True
logging.getLogger('kopf._core.engines.activities').disabled = True
2 changes: 2 additions & 0 deletions examples/13-hooks/example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import datetime
import random
from typing import Dict

Expand Down Expand Up @@ -54,6 +55,7 @@ async def login_fn(**kwargs):
certificate_path=cert.filename() if cert else None, # can be a temporary file
private_key_path=pkey.filename() if pkey else None, # can be a temporary file
default_namespace=config.namespace,
expiration=datetime.datetime.utcnow() + datetime.timedelta(seconds=30),
)


Expand Down
52 changes: 49 additions & 3 deletions kopf/_cogs/structs/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import asyncio
import collections
import dataclasses
import datetime
import random
from typing import AsyncIterable, AsyncIterator, Callable, Dict, List, \
Mapping, NewType, Optional, Tuple, TypeVar, cast
Expand Down Expand Up @@ -60,6 +61,7 @@ class ConnectionInfo:
private_key_data: Optional[bytes] = None
default_namespace: Optional[str] = None # used for cluster objects' k8s-events.
priority: int = 0
expiration: Optional[datetime.datetime] = None # TZ-naive, the same as utcnow()


_T = TypeVar('_T', bound=object)
Expand Down Expand Up @@ -116,19 +118,20 @@ def __init__(
self._current = {}
self._invalid = collections.defaultdict(list)
self._lock = asyncio.Lock()
self._next_expiration = datetime.datetime.max

if __src is not None:
self._update_converted(__src)

# Mark a pre-populated vault to be usable instantly,
# or trigger the initial authentication for an empty vault.
self._ready = aiotoggles.Toggle(bool(self))
self._ready = aiotoggles.Toggle(not self.is_empty())

def __repr__(self) -> str:
return f'<{self.__class__.__name__}: {self._current!r}>'

def __bool__(self) -> bool:
return bool(self._current)
raise NotImplementedError("The vault should not be evaluated as bool.")

async def __aiter__(
self,
Expand Down Expand Up @@ -183,6 +186,10 @@ async def _items(
# ensure that the items are ready before yielding them.
await self._ready.wait_for(True)

# Check for expiration strictly after a possible re-authentication.
# This might cause another re-authentication if the credentials are expired at creation.
await self.expire()

# Select the items to yield and let it (i.e. a consumer) work.
async with self._lock:
yielded_key, yielded_item = self.select()
Expand Down Expand Up @@ -215,14 +222,36 @@ def select(self) -> Tuple[VaultKey, VaultItem]:
key, item = random.choice(prioritised[top_priority])
return key, item

async def expire(self) -> None:
"""
Discard the expired credentials, and re-authenticate as needed.
Unlike invalidation, the expired credentials are not remembered
and not blocked from reappearing.
"""
now = datetime.datetime.utcnow()
if now >= self._next_expiration: # quick & lockless for speed: it is done on every API call
async with self._lock:
for key, item in list(self._current.items()):
if item.info.expiration is not None and now >= item.info.expiration:
await self._flush_caches(item)
del self._current[key]
self._update_expiration()
need_reauth = not self._current # i.e. nothing is left at all

# Initiate a re-authentication activity, and block until it is finished.
if need_reauth:
await self._ready.turn_to(False)
await self._ready.wait_for(True)

async def invalidate(
self,
key: VaultKey,
*,
exc: Optional[Exception] = None,
) -> None:
"""
Exclude the specified credentials, and re-authenticate as needed.
Discard the specified credentials, and re-authenticate as needed.
Multiple calls can be made for a single authenticator and credentials,
if used for multiple requests at the same time (a common case).
Expand All @@ -244,6 +273,7 @@ async def invalidate(
await self._flush_caches(self._current[key])
self._invalid[key] = self._invalid[key][-2:] + [self._current[key]]
del self._current[key]
self._update_expiration()
need_reauth = not self._current # i.e. nothing is left at all

# Initiate a re-authentication activity, and block until it is finished.
Expand Down Expand Up @@ -284,6 +314,13 @@ async def populate(
# Those tasks can be blocked in `vault.invalidate()` if there are no credentials left.
await self._ready.turn_to(True)

def is_empty(self) -> bool:
now = datetime.datetime.utcnow()
return all(
item.info.expiration is not None and now >= item.info.expiration # i.e. expired
for key, item in self._current.items()
)

async def wait_for_readiness(self) -> None:
await self._ready.wait_for(True)

Expand Down Expand Up @@ -340,3 +377,12 @@ def _update_converted(
raise ValueError("Only ConnectionInfo instances are currently accepted.")
if info not in [data.info for data in self._invalid[key]]:
self._current[key] = VaultItem(info=info)
self._update_expiration()

def _update_expiration(self) -> None:
expirations = [
item.info.expiration
for item in self._current.values()
if item.info.expiration is not None
]
self._next_expiration = min(expirations + [datetime.datetime.max])
2 changes: 1 addition & 1 deletion kopf/_core/engines/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def authenticator(
memo: ephemera.AnyMemo,
) -> NoReturn:
""" Keep the credentials forever up to date. """
counter: int = 1 if vault else 0
counter: int = 0 if vault.is_empty() else 1
while True:
await authenticate(
registry=registry,
Expand Down
6 changes: 3 additions & 3 deletions tests/authentication/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def test_empty_registry_produces_no_credentials(settings):
indices=OperatorIndexers().indices,
)

assert not vault
assert vault.is_empty()
with pytest.raises(LoginError):
async for _, _ in vault:
pass
Expand All @@ -48,7 +48,7 @@ def login_fn(**_):
indices=OperatorIndexers().indices,
)

assert not vault
assert vault.is_empty()
with pytest.raises(LoginError):
async for _, _ in vault:
pass
Expand Down Expand Up @@ -76,7 +76,7 @@ def login_fn(**_):
indices=OperatorIndexers().indices,
)

assert vault
assert not vault.is_empty()

items = []
async for key, info in vault:
Expand Down
5 changes: 5 additions & 0 deletions tests/authentication/test_connectioninfo.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime

from kopf._cogs.structs.credentials import ConnectionInfo, VaultKey


Expand All @@ -24,6 +26,7 @@ def test_creation_with_minimal_fields():
assert info.private_key_path is None
assert info.private_key_data is None
assert info.default_namespace is None
assert info.expiration is None


def test_creation_with_maximal_fields():
Expand All @@ -41,6 +44,7 @@ def test_creation_with_maximal_fields():
private_key_path='/pkey/path',
private_key_data=b'pkey_data',
default_namespace='default',
expiration=datetime.datetime.max,
)
assert info.server == 'https://localhost'
assert info.ca_path == '/ca/path'
Expand All @@ -55,3 +59,4 @@ def test_creation_with_maximal_fields():
assert info.private_key_path == '/pkey/path'
assert info.private_key_data == b'pkey_data'
assert info.default_namespace == 'default'
assert info.expiration == datetime.datetime.max
72 changes: 68 additions & 4 deletions tests/authentication/test_vault.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
import datetime

import freezegun
import pytest

from kopf._cogs.structs.credentials import ConnectionInfo, LoginError, Vault, VaultKey


async def test_evals_as_false_when_empty():
async def test_probits_evaluating_as_boolean():
vault = Vault()
with pytest.raises(NotImplementedError):
bool(vault)


async def test_empty_at_creation():
vault = Vault()
assert not vault
assert vault.is_empty()


async def test_evals_as_true_when_filled():
async def test_not_empty_when_populated():
key1 = VaultKey('some-key')
info1 = ConnectionInfo(server='https://expected/')
vault = Vault()
await vault.populate({key1: info1})
assert vault
assert not vault.is_empty()


async def test_yielding_after_creation(mocker):
Expand Down Expand Up @@ -44,6 +53,61 @@ async def test_yielding_after_population(mocker):
assert results[0][1] is info1


@freezegun.freeze_time('2020-01-01T00:00:00')
async def test_yielding_items_before_expiration(mocker):
future = datetime.datetime(2020, 1, 1, 0, 0, 0, 1)
key1 = VaultKey('some-key')
info1 = ConnectionInfo(server='https://expected/', expiration=future)
vault = Vault()
mocker.patch.object(vault._ready, 'wait_for')

results = []
await vault.populate({key1: info1})
async for key, info in vault:
results.append((key, info))

assert len(results) == 1
assert results[0][0] == key1
assert results[0][1] is info1


@pytest.mark.parametrize('delta', [0, 1])
@freezegun.freeze_time('2020-01-01T00:00:00')
async def test_yielding_ignores_expired_items(mocker, delta):
future = datetime.datetime(2020, 1, 1, 0, 0, 0, 1)
past = datetime.datetime(2020, 1, 1) - datetime.timedelta(microseconds=delta)
key1 = VaultKey('some-key')
key2 = VaultKey('other-key')
info1 = ConnectionInfo(server='https://expected/', expiration=past)
info2 = ConnectionInfo(server='https://expected/', expiration=future)
vault = Vault()
mocker.patch.object(vault._ready, 'wait_for')

results = []
await vault.populate({key1: info1, key2: info2})
async for key, info in vault:
results.append((key, info))

assert len(results) == 1
assert results[0][0] == key2
assert results[0][1] is info2


@pytest.mark.parametrize('delta', [0, 1])
@freezegun.freeze_time('2020-01-01T00:00:00')
async def test_yielding_when_everything_is_expired(mocker, delta):
past = datetime.datetime(2020, 1, 1) - datetime.timedelta(microseconds=delta)
key1 = VaultKey('some-key')
info1 = ConnectionInfo(server='https://expected/', expiration=past)
vault = Vault()
mocker.patch.object(vault._ready, 'wait_for')

await vault.populate({key1: info1})
with pytest.raises(LoginError):
async for _, _ in vault:
pass


async def test_invalidation_reraises_if_nothing_is_left_with_exception(mocker):
exc = Exception("Sample error.")
key1 = VaultKey('some-key')
Expand Down

0 comments on commit 2551b8b

Please sign in to comment.