Skip to content

Commit

Permalink
Release idle resources and disconnect idle consumers after timeout
Browse files Browse the repository at this point in the history
Release idle UserRestProxy instances to free resources not in use as
these can be recreated anytime.  ALso make it configurable to
disconnect idle consumers under UserRestProxy after longer
inactivity.
  • Loading branch information
tvainika committed Oct 4, 2022
1 parent e4e3dac commit c0d67cb
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 55 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[MASTER]
jobs=4
init-hook='import sys; sys.path.append(".")'

[MESSAGES CONTROL]
enable=
Expand Down
3 changes: 3 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
* - ``consumer_request_max_bytes``
- ``67108864``
- Rest proxy consumers maximum bytes to be fetched per request
* - ``consumer_idle_disconnect_timeout``
- ``0``
- Disconnect idle consumers after timeout seconds if not used. Inactivity leads to consumer leaving consumer group and consumer state. 0 (default) means no auto-disconnect.
* - ``fetch_min_bytes``
- ``-1``
- Rest proxy consumers minimum bytes to be fetched per request. ``-1`` means no limit
Expand Down
1 change: 1 addition & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"consumer_enable_auto_commit": True,
"consumer_request_timeout_ms": 11000,
"consumer_request_max_bytes": 67108864,
"consumer_idle_disconnect_timeout": 0,
"fetch_min_bytes": -1,
"group_id": "schema-registry",
"host": "127.0.0.1",
Expand Down
167 changes: 113 additions & 54 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
OFFSET_RESET_STRATEGIES = {"latest", "earliest"}
SCHEMA_MAPPINGS = {"avro": SchemaType.AVRO, "jsonschema": SchemaType.JSONSCHEMA, "protobuf": SchemaType.PROTOBUF}
TypedConsumer = namedtuple("TypedConsumer", ["consumer", "serialization_format", "config"])
IDLE_PROXY_TIMEOUT = 5 * 60

log = logging.getLogger(__name__)

Expand All @@ -51,16 +52,54 @@ def __init__(self, config: Config) -> None:
self._add_kafka_rest_routes()
self.serializer = SchemaRegistrySerializer(config=config)
self.proxies: Dict[str, "UserRestProxy"] = {}
self._proxy_lock = asyncio.Lock()
log.info("REST proxy starting with (delegated authorization=%s)", self.config.get("rest_authorization", False))
self._idle_proxy_janitor_task: Optional[asyncio.Task] = None

async def close(self) -> None:
if self._idle_proxy_janitor_task is not None:
self._idle_proxy_janitor_task.cancel()
self._idle_proxy_janitor_task = None
async with AsyncExitStack() as stack:
stack.push_async_callback(super().close)
stack.push_async_callback(self.serializer.close)

for proxy in self.proxies.values():
stack.push_async_callback(proxy.aclose)

async def _idle_proxy_janitor(self) -> None:
while True:
await asyncio.sleep(IDLE_PROXY_TIMEOUT / 2)

try:
await self._disconnect_idle_proxy_if_any()
except: # pylint: disable=bare-except
log.exception("Disconnecting idle proxy failure")

async def _disconnect_idle_proxy_if_any(self) -> None:
idle_consumer_timeout = self.config.get("consumer_idle_disconnect_timeout", 0)

key, proxy = None, None
async with self._proxy_lock:
# Always clean one at time, don't mutate dict while iterating
for _key, _proxy in self.proxies.items():
# If UserRestProxy has consumers with state, disconnecting loses state
if _proxy.num_consumers() > 0:
if idle_consumer_timeout > 0 and _proxy.last_used + idle_consumer_timeout < time.monotonic():
key, proxy = _key, _proxy
log.warning("Disconnecting idle consumers for %s", _proxy)
break
# If there are no consumers, connection can be recreated without losing any state
else:
if _proxy.last_used + IDLE_PROXY_TIMEOUT < time.monotonic():
key, proxy = _key, _proxy
log.info("Releasing unused connection for %s", _proxy)
break
if key is not None:
del self.proxies[key]
if proxy is not None:
await proxy.aclose()

def _add_kafka_rest_routes(self) -> None:
# Brokers
self.route(
Expand Down Expand Up @@ -220,141 +259,147 @@ def _add_kafka_rest_routes(self) -> None:
)
self.route("/topics/<topic:path>", callback=self.topic_publish, method="POST", rest_request=True, with_request=True)

def get_user_proxy(self, request: HTTPRequest) -> "UserRestProxy":
async def get_user_proxy(self, request: HTTPRequest) -> "UserRestProxy":
key = ""
try:
if self.config.get("rest_authorization", False):
auth_header = request.headers.get("Authorization")

if auth_header is None:
raise HTTPResponse(
body='{"message": "Unauthorized"}',
status=HTTPStatus.UNAUTHORIZED,
content_type=JSON_CONTENT_TYPE,
headers={"WWW-Authenticate": 'Basic realm="Karapace REST Proxy"'},
)
key = auth_header
if self.proxies.get(key) is None:
auth = aiohttp.BasicAuth.decode(auth_header)
config = self.config.copy()
config["bootstrap_uri"] = config["sasl_bootstrap_uri"]
config["security_protocol"] = (
"SASL_SSL" if config["security_protocol"] in ("SSL", "SASL_SSL") else "SASL_PLAINTEXT"
)
config["sasl_mechanism"] = "PLAIN"
config["sasl_plain_username"] = auth.login
config["sasl_plain_password"] = auth.password
self.proxies[key] = UserRestProxy(config, self.kafka_timeout, self.serializer)
else:
if self.proxies.get(key) is None:
self.proxies[key] = UserRestProxy(self.config, self.kafka_timeout, self.serializer)
except NoBrokersAvailable:
# This can be caused also due misconfigration, but kafka-python's
# KafkaAdminClient cannot currently distinguish those two cases
log.exception("Failed to connect to Kafka with the credentials")
self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN)
return self.proxies[key]
async with self._proxy_lock:
if self._idle_proxy_janitor_task is None:
self._idle_proxy_janitor_task = asyncio.create_task(self._idle_proxy_janitor())

try:
if self.config.get("rest_authorization", False):
auth_header = request.headers.get("Authorization")

if auth_header is None:
raise HTTPResponse(
body='{"message": "Unauthorized"}',
status=HTTPStatus.UNAUTHORIZED,
content_type=JSON_CONTENT_TYPE,
headers={"WWW-Authenticate": 'Basic realm="Karapace REST Proxy"'},
)
key = auth_header
if self.proxies.get(key) is None:
auth = aiohttp.BasicAuth.decode(auth_header)
config = self.config.copy()
config["bootstrap_uri"] = config["sasl_bootstrap_uri"]
config["security_protocol"] = (
"SASL_SSL" if config["security_protocol"] in ("SSL", "SASL_SSL") else "SASL_PLAINTEXT"
)
config["sasl_mechanism"] = "PLAIN"
config["sasl_plain_username"] = auth.login
config["sasl_plain_password"] = auth.password
self.proxies[key] = UserRestProxy(config, self.kafka_timeout, self.serializer)
else:
if self.proxies.get(key) is None:
self.proxies[key] = UserRestProxy(self.config, self.kafka_timeout, self.serializer)
except NoBrokersAvailable:
# This can be caused also due misconfigration, but kafka-python's
# KafkaAdminClient cannot currently distinguish those two cases
log.exception("Failed to connect to Kafka with the credentials")
self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN)
proxy = self.proxies[key]
proxy.mark_used()
return proxy

async def list_brokers(self, content_type: str, *, request: HTTPRequest):
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.list_brokers(content_type)

async def commit_consumer_offsets(
self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest
) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.commit_consumer_offsets(group_name, instance, content_type, request=request)

async def get_consumer_offsets(self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.get_consumer_offsets(group_name, instance, content_type, request=request)

async def update_consumer_subscription(
self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest
) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.update_consumer_subscription(group_name, instance, content_type, request=request)

async def get_consumer_subscription(
self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest
) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.get_consumer_subscription(group_name, instance, content_type)

async def delete_consumer_subscription(
self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest
) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.delete_consumer_subscription(group_name, instance, content_type)

async def update_consumer_assignment(
self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest
) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.update_consumer_assignment(group_name, instance, content_type, request=request)

async def get_consumer_assignment(
self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest
) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.get_consumer_assignment(group_name, instance, content_type)

async def seek_beginning_consumer_offsets(
self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest
) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.seek_beginning_consumer_offsets(group_name, instance, content_type, request=request)

async def seek_end_consumer_offsets(
self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest
) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.seek_end_consumer_offsets(group_name, instance, content_type, request=request)

async def update_consumer_offsets(
self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest
) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.update_consumer_offsets(group_name, instance, content_type, request=request)

async def fetch(self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.fetch(group_name, instance, content_type, request=request)

async def create_consumer(self, group_name: str, content_type: str, *, request: HTTPRequest) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.create_consumer(group_name, content_type, request=request)

async def delete_consumer(self, group_name: str, instance: str, content_type: str, *, request: HTTPRequest) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.delete_consumer(group_name, instance, content_type)

async def partition_offsets(self, content_type: str, *, topic: str, partition_id: str, request: HTTPRequest):
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.partition_offsets(content_type, topic=topic, partition_id=partition_id)

async def partition_details(self, content_type: str, *, topic: str, partition_id: str, request: HTTPRequest):
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.partition_details(content_type, topic=topic, partition_id=partition_id)

async def partition_publish(self, topic: str, partition_id: str, content_type: str, *, request: HTTPRequest) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.partition_publish(topic, partition_id, content_type, request=request)

async def list_partitions(self, content_type: str, *, topic: str, request: HTTPRequest):
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.list_partitions(content_type, topic=topic)

async def list_topics(self, content_type: str, *, request: HTTPRequest):
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.list_topics(content_type)

async def topic_details(self, content_type: str, *, topic: str, request: HTTPRequest):
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.topic_details(content_type, topic=topic)

async def topic_publish(self, topic: str, content_type: str, *, request: HTTPRequest) -> None:
proxy = self.get_user_proxy(request)
proxy = await self.get_user_proxy(request)
await proxy.topic_publish(topic, content_type, request=request)


Expand All @@ -373,10 +418,24 @@ def __init__(self, config: Config, kafka_timeout: int, serializer):
self.schemas_cache = {}
self.consumer_manager = ConsumerManager(config=config, deserializer=self.serializer)
self.init_admin_client()
self._last_used = time.monotonic()

self._async_producer_lock = asyncio.Lock()
self._async_producer: Optional[AIOKafkaProducer] = None

def __str__(self) -> str:
return f"UserRestProxy(username={self.config['sasl_plain_username']})"

@property
def last_used(self) -> int:
return self._last_used

def mark_used(self) -> None:
self._last_used = time.monotonic()

def num_consumers(self) -> int:
return len(self.consumer_manager.consumers)

async def _maybe_create_async_producer(self) -> AIOKafkaProducer:
if self._async_producer is not None:
return self._async_producer
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async def test_internal(rest_async, admin_client):
[b"key", b"value", 0],
[b"key", b"value", None],
]
rest_async_proxy = rest_async.get_user_proxy(None)
rest_async_proxy = await rest_async.get_user_proxy(None)
results = await rest_async_proxy.produce_messages(topic=topic_name, prepared_records=prepared_records)
assert len(results) == 2
for result in results:
Expand Down
Loading

0 comments on commit c0d67cb

Please sign in to comment.