-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Enable OAuth/OIDC authentication with Bearer token
Handle OAuth/OIDC (Bearer token) auth headers, and use them when instantiating Kafka clients (with the exception of backups for now). Basic authentication behaviour is unchanged, just extracted and unittested.
- Loading branch information
Mátyás Kuti
committed
Oct 4, 2023
1 parent
8a4ff32
commit 310b273
Showing
6 changed files
with
231 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
""" | ||
Copyright (c) 2023 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
from __future__ import annotations | ||
|
||
from aiokafka.abc import AbstractTokenProvider as AbstractTokenProviderAsync | ||
from http import HTTPStatus | ||
from kafka.oauth.abstract import AbstractTokenProvider | ||
from karapace.config import Config | ||
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE | ||
from typing import NoReturn, TypedDict | ||
|
||
import aiohttp | ||
import dataclasses | ||
import enum | ||
|
||
|
||
@enum.unique | ||
class TokenType(enum.Enum): | ||
BASIC = "Basic" | ||
BEARER = "Bearer" | ||
|
||
|
||
def raise_unauthorized() -> NoReturn: | ||
raise HTTPResponse( | ||
body='{"message": "Unauthorized"}', | ||
status=HTTPStatus.UNAUTHORIZED, | ||
content_type=JSON_CONTENT_TYPE, | ||
headers={"WWW-Authenticate": 'Basic realm="Karapace REST Proxy"'}, | ||
) | ||
|
||
|
||
class SASLPlainConfig(TypedDict): | ||
sasl_mechanism: str | None | ||
sasl_plain_username: str | None | ||
sasl_plain_password: str | None | ||
|
||
|
||
class SASLOauthConfig(TypedDict): | ||
sasl_mechanism: str | None | ||
sasl_oauth_token: str | None | ||
|
||
|
||
def get_auth_config_from_header( | ||
auth_header: str | None, | ||
config: Config, | ||
) -> SASLPlainConfig | SASLOauthConfig: | ||
if auth_header is None: | ||
raise_unauthorized() | ||
|
||
token_type, _separator, token = auth_header.partition(" ") | ||
|
||
if token_type == TokenType.BEARER.value: | ||
return {"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": token} | ||
|
||
if token_type == TokenType.BASIC.value: | ||
basic_auth = aiohttp.BasicAuth.decode(auth_header) | ||
sasl_mechanism = config["sasl_mechanism"] | ||
if sasl_mechanism is None: | ||
sasl_mechanism = "PLAIN" | ||
|
||
return { | ||
"sasl_mechanism": sasl_mechanism, | ||
"sasl_plain_username": basic_auth.login, | ||
"sasl_plain_password": basic_auth.password, | ||
} | ||
|
||
raise_unauthorized() | ||
|
||
|
||
@dataclasses.dataclass | ||
class SimpleOauthTokenProvider(AbstractTokenProvider): | ||
_token: str | ||
|
||
def token(self) -> str: | ||
return self._token | ||
|
||
|
||
@dataclasses.dataclass | ||
class SimpleOauthTokenProviderAsync(AbstractTokenProviderAsync): | ||
_token: str | ||
|
||
async def token(self) -> str: | ||
return self._token | ||
|
||
|
||
class SASLOauthParams(TypedDict): | ||
sasl_mechanism: str | ||
sasl_oauth_token_provider: AbstractTokenProvider | AbstractTokenProviderAsync | ||
|
||
|
||
def get_kafka_client_auth_parameters_from_config( | ||
config: Config, | ||
*, | ||
async_client: bool = True, | ||
) -> SASLPlainConfig | SASLOauthParams: | ||
if config["sasl_mechanism"] == "OAUTHBEARER": | ||
token_provider_cls = SimpleOauthTokenProviderAsync if async_client else SimpleOauthTokenProvider | ||
return { | ||
"sasl_mechanism": config["sasl_mechanism"], | ||
"sasl_oauth_token_provider": token_provider_cls(config["sasl_oauth_token"]), | ||
} | ||
|
||
return { | ||
"sasl_mechanism": config["sasl_mechanism"], | ||
"sasl_plain_username": config["sasl_plain_username"], | ||
"sasl_plain_password": config["sasl_plain_password"], | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
""" | ||
Copyright (c) 2023 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
from http import HTTPStatus | ||
from karapace.config import ConfigDefaults, set_config_defaults | ||
from karapace.kafka_rest_apis.auth_utils import ( | ||
get_auth_config_from_header, | ||
get_kafka_client_auth_parameters_from_config, | ||
SimpleOauthTokenProvider, | ||
SimpleOauthTokenProviderAsync, | ||
) | ||
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE | ||
from typing import Optional | ||
|
||
import base64 | ||
import pytest | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"auth_header", | ||
(None, "Digest foo=bar"), | ||
) | ||
def test_get_auth_config_from_header_raises_unauthorized_on_invalid_header(auth_header: Optional[str]) -> None: | ||
config = set_config_defaults({}) | ||
|
||
with pytest.raises(HTTPResponse) as exc_info: | ||
get_auth_config_from_header(auth_header, config) | ||
|
||
http_resonse = exc_info.value | ||
assert http_resonse.body == '{"message": "Unauthorized"}' | ||
assert http_resonse.status == HTTPStatus.UNAUTHORIZED | ||
assert http_resonse.headers["Content-Type"] == JSON_CONTENT_TYPE | ||
assert http_resonse.headers["WWW-Authenticate"] == 'Basic realm="Karapace REST Proxy"' | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("auth_header", "config_override", "expected_auth_config"), | ||
( | ||
( | ||
f"Basic {base64.b64encode(b'username:password').decode()}", | ||
{"sasl_mechanism": None}, | ||
{"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"}, | ||
), | ||
( | ||
f"Basic {base64.b64encode(b'username:password').decode()}", | ||
{"sasl_mechanism": "PLAIN"}, | ||
{"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"}, | ||
), | ||
( | ||
f"Basic {base64.b64encode(b'username:password').decode()}", | ||
{"sasl_mechanism": "SCRAM"}, | ||
{"sasl_mechanism": "SCRAM", "sasl_plain_username": "username", "sasl_plain_password": "password"}, | ||
), | ||
( | ||
"Bearer <TOKEN>", | ||
{}, | ||
{"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": "<TOKEN>"}, | ||
), | ||
), | ||
) | ||
def test_get_auth_config_from_header( | ||
auth_header: str, config_override: ConfigDefaults, expected_auth_config: ConfigDefaults | ||
) -> None: | ||
config = set_config_defaults(config_override) | ||
auth_config = get_auth_config_from_header(auth_header, config) | ||
assert auth_config == expected_auth_config | ||
|
||
|
||
def test_simple_oauth_token_provider_returns_configured_token() -> None: | ||
token_provider = SimpleOauthTokenProvider("TOKEN") | ||
assert token_provider.token() == "TOKEN" | ||
|
||
|
||
async def test_simple_oauth_token_provider_async_returns_configured_token() -> None: | ||
token_provider = SimpleOauthTokenProviderAsync("TOKEN") | ||
assert await token_provider.token() == "TOKEN" | ||
|
||
|
||
def test_get_client_auth_parameters_from_config_sasl_plain() -> None: | ||
config = set_config_defaults( | ||
{"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"} | ||
) | ||
|
||
client_auth_params = get_kafka_client_auth_parameters_from_config(config) | ||
|
||
assert client_auth_params == { | ||
"sasl_mechanism": "PLAIN", | ||
"sasl_plain_username": "username", | ||
"sasl_plain_password": "password", | ||
} | ||
|
||
|
||
def test_get_client_auth_parameters_from_config_oauth() -> None: | ||
config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": "TOKEN"}) | ||
|
||
client_auth_params = get_kafka_client_auth_parameters_from_config(config, async_client=False) | ||
|
||
assert client_auth_params["sasl_mechanism"] == "OAUTHBEARER" | ||
assert client_auth_params["sasl_oauth_token_provider"].token() == "TOKEN" | ||
|
||
|
||
async def test_get_client_auth_parameters_from_config_oauth_async() -> None: | ||
config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": "TOKEN"}) | ||
|
||
client_auth_params = get_kafka_client_auth_parameters_from_config(config, async_client=True) | ||
|
||
assert client_auth_params["sasl_mechanism"] == "OAUTHBEARER" | ||
assert await client_auth_params["sasl_oauth_token_provider"].token() == "TOKEN" |