Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST proxy SASL OIDC authentication #731

Merged
merged 3 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,24 @@ Example of complete authorization file
]
}

OAuth2 authentication and authorization of Karapace REST proxy
===================================================================

The Karapace REST proxy supports passing OAuth2 credentials to the underlying Kafka service (defined in the ``sasl_bootstrap_uri`` configuration parameter). The JSON Web Token (JWT) is extracted from the ``Authorization`` HTTP header if the authorization scheme is ``Bearer``,
eg. ``Authorization: Bearer $JWT``. If a ``Bearer`` token is present, the Kafka clients managed by Karapace will be created to use the SASL ``OAUTHBEARER`` mechanism and the JWT will be passed along. The Karapace REST proxy does not verify the token, that is done by
the underlying Kafka service itself, if it's configured accordingly.

Authorization is also done by Kafka itself, typically using the ``sub`` claim (although it's configurable) from the JWT as the username, checked against the configured ACLs.

OAuth2 and ``Bearer`` token usage is dependent on the ``rest_authorization`` configuration parameter being ``true``.

Token expiry
------------

The REST proxy process manages a set of producer and consumer clients, which are identified by the OAuth2 JWT token. These are periodically cleaned up if they are idle, as well as *before* the JWT token expires (the clean up currently runs every 5 minutes).

Before a client refreshes its OAuth2 JWT token, it is expected to remove currently running consumers (eg. after committing their offsets) and producers using the current token.

Uninstall
=========

Expand Down
2 changes: 2 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Config(TypedDict):
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
sasl_oauth_token: str | None
jjaakola-aiven marked this conversation as resolved.
Show resolved Hide resolved
topic_name: str
metadata_max_age_ms: int
admin_metadata_max_age: int
Expand Down Expand Up @@ -131,6 +132,7 @@ class ConfigDefaults(Config, total=False):
"sasl_mechanism": None,
"sasl_plain_username": None,
SASL_PLAIN_PASSWORD: None,
"sasl_oauth_token": None,
"topic_name": DEFAULT_SCHEMA_TOPIC,
"metadata_max_age_ms": 60000,
"admin_metadata_max_age": 5,
Expand Down
63 changes: 38 additions & 25 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from contextlib import AsyncExitStack, closing
from http import HTTPStatus
from kafka.errors import (
AuthenticationFailedError,
BrokerResponseError,
KafkaTimeoutError,
NoBrokersAvailable,
Expand All @@ -15,21 +16,26 @@
from karapace.config import Config, create_client_ssl_context
from karapace.errors import InvalidSchema
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
from karapace.kafka_rest_apis.authentication import (
get_auth_config_from_header,
get_expiration_time_from_header,
get_kafka_client_auth_parameters_from_config,
)
from karapace.kafka_rest_apis.consumer_manager import ConsumerManager
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.kafka_rest_apis.schema_cache import TopicSchemaCache
from karapace.karapace import KarapaceBase
from karapace.rapu import HTTPRequest, HTTPResponse, JSON_CONTENT_TYPE
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE
from karapace.schema_models import TypedSchema, ValidatedTypedSchema
from karapace.schema_type import SchemaType
from karapace.serialization import InvalidMessageSchema, InvalidPayload, SchemaRegistrySerializer, SchemaRetrievalError
from karapace.typing import SchemaId, Subject
from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient
from typing import Callable, Dict, List, Optional, Tuple, Union

import aiohttp.web
import asyncio
import base64
import datetime
import logging
import time

Expand All @@ -41,6 +47,7 @@
SCHEMA_MAPPINGS = {"avro": SchemaType.AVRO, "jsonschema": SchemaType.JSONSCHEMA, "protobuf": SchemaType.PROTOBUF}
TypedConsumer = namedtuple("TypedConsumer", ["consumer", "serialization_format", "config"])
IDLE_PROXY_TIMEOUT = 5 * 60
AUTH_EXPIRY_TOLERANCE = datetime.timedelta(seconds=IDLE_PROXY_TIMEOUT)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,6 +93,13 @@ async def _disconnect_idle_proxy_if_any(self) -> None:
async with self._proxy_lock:
# Always clean one at time, don't mutate dict while iterating
for _key, _proxy in self.proxies.items():
# In case of an OAuth2/OIDC token, the proxy is to be cleaned up _before_ the token expires
# If the token is still valid within the tolerance time, idleness is still checked
now = datetime.datetime.now(datetime.timezone.utc)
if _proxy.auth_expiry and _proxy.auth_expiry < now + AUTH_EXPIRY_TOLERANCE:
key, proxy = _key, _proxy
log.warning("Releasing unused connection for %s due to token expiry at %s", _proxy, _proxy.auth_expiry)
break
# If UserRestProxy has consumers with state, disconnecting loses state
if _proxy.num_consumers() > 0:
if idle_consumer_timeout > 0 and _proxy.last_used + idle_consumer_timeout < time.monotonic():
Expand Down Expand Up @@ -271,33 +285,25 @@ async def get_user_proxy(self, request: HTTPRequest) -> "UserRestProxy":
try:
if self.config.get("rest_authorization", False):
auth_header = request.headers.get("Authorization")
auth_config = get_auth_config_from_header(auth_header, self.config)
auth_expiry = get_expiration_time_from_header(auth_header)

if auth_header is None:
raise HTTPResponse(
body='{"message": "Unauthorized"}',
status=HTTPStatus.UNAUTHORIZED,
content_type=JSON_CONTENT_TYPE,
headers={"WWW-Authenticate": 'Basic realm="Karapace REST Proxy"'},
)
key = auth_header
if self.proxies.get(key) is None:
auth = aiohttp.BasicAuth.decode(auth_header)
config = self.config.copy()
config["bootstrap_uri"] = config["sasl_bootstrap_uri"]
config["security_protocol"] = (
"SASL_SSL" if config["security_protocol"] in ("SSL", "SASL_SSL") else "SASL_PLAINTEXT"
)
if config["sasl_mechanism"] is None:
config["sasl_mechanism"] = "PLAIN"
config["sasl_plain_username"] = auth.login
config["sasl_plain_password"] = auth.password
self.proxies[key] = UserRestProxy(config, self.kafka_timeout, self.serializer)
config.update(auth_config)
self.proxies[key] = UserRestProxy(config, self.kafka_timeout, self.serializer, auth_expiry)
else:
if self.proxies.get(key) is None:
self.proxies[key] = UserRestProxy(self.config, self.kafka_timeout, self.serializer)
except NoBrokersAvailable:
# This can be caused also due misconfigration, but kafka-python's
# KafkaAdminClient cannot currently distinguish those two cases
except (NoBrokersAvailable, AuthenticationFailedError):
# NoBrokersAvailable can be caused also due to misconfigration, but kafka-python's
# KafkaAdminClient cannot currently distinguish those two cases.
# A more expressive AuthenticationFailedError is raised in case of OAuth2
log.exception("Failed to connect to Kafka with the credentials")
self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN)
proxy = self.proxies[key]
Expand Down Expand Up @@ -408,7 +414,13 @@ async def topic_publish(self, topic: str, content_type: str, *, request: HTTPReq


class UserRestProxy:
def __init__(self, config: Config, kafka_timeout: int, serializer):
def __init__(
self,
config: Config,
kafka_timeout: int,
serializer: SchemaRegistrySerializer,
auth_expiry: Optional[datetime.datetime] = None,
):
self.config = config
self.kafka_timeout = kafka_timeout
self.serializer = serializer
Expand All @@ -423,6 +435,7 @@ def __init__(self, config: Config, kafka_timeout: int, serializer):
self.consumer_manager = ConsumerManager(config=config, deserializer=self.serializer)
self.init_admin_client()
self._last_used = time.monotonic()
self._auth_expiry = auth_expiry

self._async_producer_lock = asyncio.Lock()
self._async_producer: Optional[AIOKafkaProducer] = None
Expand All @@ -437,6 +450,10 @@ def last_used(self) -> int:
def mark_used(self) -> None:
self._last_used = time.monotonic()

@property
def auth_expiry(self) -> datetime.datetime:
return self._auth_expiry

def num_consumers(self) -> int:
return len(self.consumer_manager.consumers)

Expand Down Expand Up @@ -471,9 +488,7 @@ async def _maybe_create_async_producer(self) -> AIOKafkaProducer:
metadata_max_age_ms=self.config["metadata_max_age_ms"],
security_protocol=self.config["security_protocol"],
ssl_context=ssl_context,
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
**get_kafka_client_auth_parameters_from_config(self.config),
)

try:
Expand Down Expand Up @@ -626,13 +641,11 @@ def init_admin_client(self):
ssl_cafile=self.config["ssl_cafile"],
ssl_certfile=self.config["ssl_certfile"],
ssl_keyfile=self.config["ssl_keyfile"],
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
api_version=(1, 0, 0),
metadata_max_age_ms=self.config["metadata_max_age_ms"],
connections_max_idle_ms=self.config["connections_max_idle_ms"],
kafka_client=KarapaceKafkaClient,
**get_kafka_client_auth_parameters_from_config(self.config, async_client=False),
)
break
except: # pylint: disable=bare-except
Expand Down
165 changes: 165 additions & 0 deletions karapace/kafka_rest_apis/authentication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from aiokafka.abc import AbstractTokenProvider as AbstractTokenProviderAsync
from http import HTTPStatus
from kafka.oauth.abstract import AbstractTokenProvider
from karapace.config import Config
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE
from typing import NoReturn, TypedDict

import aiohttp
import dataclasses
import datetime
import enum
import jwt


@enum.unique
class TokenType(enum.Enum):
BASIC = "Basic"
BEARER = "Bearer"


def raise_unauthorized() -> NoReturn:
raise HTTPResponse(
body='{"message": "Unauthorized"}',
status=HTTPStatus.UNAUTHORIZED,
content_type=JSON_CONTENT_TYPE,
headers={"WWW-Authenticate": 'Basic realm="Karapace REST Proxy"'},
)


class SASLPlainConfig(TypedDict):
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None


class SASLOauthConfig(TypedDict):
sasl_mechanism: str | None
sasl_oauth_token: str | None


def _split_auth_header(auth_header: str) -> tuple[str, str]:
token_type, _separator, token = auth_header.partition(" ")
return (token_type, token)


def get_auth_config_from_header(
auth_header: str | None,
config: Config,
) -> SASLPlainConfig | SASLOauthConfig:
"""Verify the given Authorization HTTP header and constructs config parameters based on it.

In case the Authorization header is `None`, or unknown, raises an Unauthorized HTTP response.
Known/possible authentication tokens are `Bearer` and `Basic`.

:param auth_header: The Authorization header extracted from an HTTP request
:param config: Current config of Karapace, necessary to decide on the SASL mechanism
"""
if auth_header is None:
raise_unauthorized()

token_type, token = _split_auth_header(auth_header)

if token_type == TokenType.BEARER.value:
return {"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": token}

if token_type == TokenType.BASIC.value:
basic_auth = aiohttp.BasicAuth.decode(auth_header)
sasl_mechanism = config["sasl_mechanism"]
if sasl_mechanism is None:
sasl_mechanism = "PLAIN"

return {
"sasl_mechanism": sasl_mechanism,
"sasl_plain_username": basic_auth.login,
"sasl_plain_password": basic_auth.password,
}

raise_unauthorized()


def get_expiration_time_from_header(auth_header: str) -> datetime.datetime | None:
"""Extract expiration from Authorization HTTP header.

In case of an OAuth Bearer token, the `exp` claim is extracted and returned as a
`datetime.datetime` object. Otherwise it's safely assumed that the authentication
method is Basic, thus no expiry of the credentials.

The signature is not verified as it is done by the Kafka clients using it and
discarding the token in case of any issues.

:param auth_header: The Authorization header extracted from an HTTP request
"""
token_type, token = _split_auth_header(auth_header)

if token_type == TokenType.BEARER.value:
exp_claim = jwt.decode(token, options={"verify_signature": False}).get("exp")
if exp_claim is not None:
return datetime.datetime.fromtimestamp(exp_claim, datetime.timezone.utc)

return None


@dataclasses.dataclass
class SimpleOauthTokenProvider(AbstractTokenProvider):
"""A pass-through OAuth token provider to be used by synchronous Kafka clients.

The token is meant to be extracted from an HTTP Authorization header.
"""

_token: str

def token(self) -> str:
return self._token


@dataclasses.dataclass
class SimpleOauthTokenProviderAsync(AbstractTokenProviderAsync):
"""A pass-through OAuth token provider to be used by asynchronous Kafka clients.

The token is meant to be extracted from an HTTP Authorization header.
"""

_token: str

async def token(self) -> str:
return self._token


class SASLOauthParams(TypedDict):
sasl_mechanism: str
sasl_oauth_token_provider: AbstractTokenProvider | AbstractTokenProviderAsync


def get_kafka_client_auth_parameters_from_config(
config: Config,
*,
async_client: bool = True,
) -> SASLPlainConfig | SASLOauthParams:
"""Create authentication parameters for a Kafka client based on the Karapace config.

In case of an `OAUTHBEARER` SASL mechanism present in the config, will create the
OAuth token provider needed by the Kafka client - the `async_client` parameter
decides whether this will be a sync or async one.

:param config: Current config of Karapace
:param async_client: Flag to indicate whether the Kafka client using the returned paramaters is async
"""
if config["sasl_mechanism"] == "OAUTHBEARER":
token_provider_cls = SimpleOauthTokenProviderAsync if async_client else SimpleOauthTokenProvider
return {
"sasl_mechanism": config["sasl_mechanism"],
"sasl_oauth_token_provider": token_provider_cls(config["sasl_oauth_token"]),
}

return {
"sasl_mechanism": config["sasl_mechanism"],
"sasl_plain_username": config["sasl_plain_username"],
"sasl_plain_password": config["sasl_plain_password"],
}
5 changes: 2 additions & 3 deletions karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from kafka.errors import GroupAuthorizationFailedError, IllegalStateError, KafkaConfigurationError, KafkaError
from kafka.structs import TopicPartition
from karapace.config import Config, create_client_ssl_context
from karapace.kafka_rest_apis.authentication import get_kafka_client_auth_parameters_from_config
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.karapace import empty_response, KarapaceBase
from karapace.serialization import DeserializationError, InvalidMessageHeader, InvalidPayload, SchemaRegistrySerializer
Expand Down Expand Up @@ -205,9 +206,6 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
client_id=internal_name,
security_protocol=self.config["security_protocol"],
ssl_context=ssl_context,
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
group_id=group_name,
fetch_min_bytes=max(1, fetch_min_bytes), # Discard earlier negative values
fetch_max_bytes=self.config["consumer_request_max_bytes"],
Expand All @@ -218,6 +216,7 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
enable_auto_commit=request_data["auto.commit.enable"],
auto_offset_reset=request_data["auto.offset.reset"],
session_timeout_ms=session_timeout_ms,
**get_kafka_client_auth_parameters_from_config(self.config),
)
await c.start()
return c
Expand Down
Loading