From c6b887efb5f2577205fd32f0eafd6eaaf78114f0 Mon Sep 17 00:00:00 2001 From: cl0ete Date: Thu, 17 Oct 2024 17:31:32 +0200 Subject: [PATCH] :bug: Fix endorser Nats connection going stale (#1122) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add max attempts env * :art: * must be int * reset to 15 * add temp debug log * more temp debug logs * dont reset nats * revert reset nats * remove temp debug logging * add heartbeat to fetch * import fetch time out error * on fetch timeout continue * on generic timeout re-subscribe * update imports * fix test * add tests for health endpoints * update health endpoints * add check jetstream func for health checks * fix test * :art: * update endorser health endpoint * :sparkles: Implement DID Exchange and DID Rotate methods and 🗑️ deprecate connections protocol (#1119) * :wastebasket: mark deprecated routes * :construction: initial implementation of new did-exchange routes * :art: modify default extra settings example * :wrench: Update default pylint config * :art: update docstrings and available optional params * :construction: e2e test under construction * :white_check_mark: fix up create request tests * :bug: fix protocol: /1.1 doesn't work ... * :bug: use_public_did must be false in accept-request * :white_check_mark: working tests for accept-request * :art: remove unused endpoint and update route names * :art: update route names * :art: accept reject reason sa body instead of param * :arrow_up: Use latest cloudcontroller * :arrow_up: Update lock files * :sparkles: implement did-rotate endpoints * :wrench: add max-positional-arguments to pylintrc * :arrow_up: use latest cloudcontroller * :arrow_up: Update lock files * :sparkles: Use did-rotate/hangup in deletion of connection record (if using didexchange protocol) * :white_check_mark: Fix deleting records when using oob connections * :white_check_mark: assert connections are complete for both parties * :white_check_mark: e2e tests for did-rotate * :arrow_up: Helmfile `0.169`, Helm `3.16.2`, Tailscale `1.76.0` (#1123) * :pushpin: Pin `xk6` and plugin versions (#1124) * `xk6-sse` isn't compatible with `k6>=0.53` * Pin `xk6` and all plugins to the latest compatible versions * Also bump Golang to `1.23` * :pushpin: Explicitly pin k6 to `v0.52.0` (#1125) * :arrow_up: Update lock files * :art: * :white_check_mark: 100% unit test coverage for new did-exchange and did-rotate methods * :white_check_mark: fixed up delete connection test * :bug: Reconfigure ACAPY_AUTO_ACCEPT_REQUESTS for Faber after test completes * :white_check_mark: add cleaning up of connection records, for regression fixtures not to get bloated --------- Co-authored-by: Robbie Blaine <4052340+rblaine95@users.noreply.github.com> * :poop: add sleep to avoid mysterious 500 error --------- Co-authored-by: Mourits de Beer <31511766+ff137@users.noreply.github.com> Co-authored-by: Robbie Blaine <4052340+rblaine95@users.noreply.github.com> Co-authored-by: ff137 --- .../onboarding/util/register_issuer_did.py | 5 +- app/tests/e2e/test_did_exchange.py | 3 + docker-compose.yaml | 2 +- endorser/main.py | 33 ++++++- endorser/services/endorsement_processor.py | 25 ++++- endorser/tests/test_endorser_processor.py | 3 +- endorser/tests/test_main.py | 95 ++++++++++++++++++- 7 files changed, 156 insertions(+), 10 deletions(-) diff --git a/app/services/onboarding/util/register_issuer_did.py b/app/services/onboarding/util/register_issuer_did.py index 43e91ca0b..9d1fd63ba 100644 --- a/app/services/onboarding/util/register_issuer_did.py +++ b/app/services/onboarding/util/register_issuer_did.py @@ -1,4 +1,5 @@ import asyncio +import os from logging import Logger from aries_cloudcontroller import ( @@ -18,6 +19,8 @@ ) from shared import ACAPY_ENDORSER_ALIAS +MAX_ATTEMPTS = int(os.getenv("WAIT_ISSUER_DID_MAX_ATTEMPTS", "15")) + async def create_connection_with_endorser( *, @@ -241,7 +244,7 @@ async def wait_issuer_did_transaction_endorsed( issuer_controller: AcaPyClient, issuer_connection_id: str, logger: Logger, - max_attempts: int = 15, + max_attempts: int = MAX_ATTEMPTS, retry_delay: float = 1.0, ) -> None: attempt = 0 diff --git a/app/tests/e2e/test_did_exchange.py b/app/tests/e2e/test_did_exchange.py index d0d4849f7..eadc41f78 100644 --- a/app/tests/e2e/test_did_exchange.py +++ b/app/tests/e2e/test_did_exchange.py @@ -1,3 +1,4 @@ +import asyncio from typing import Optional import pytest @@ -99,6 +100,7 @@ async def test_create_did_exchange_request( filter_map={"their_did": alice_did}, ) finally: + await asyncio.sleep(1) # Short sleep assists in avoiding 500 error # Delete connection records: await alice_member_client.delete( f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}" @@ -171,6 +173,7 @@ async def test_accept_did_exchange_invitation( filter_map={"connection_id": faber_connection_id}, ) finally: + await asyncio.sleep(1) # Short sleep assists in avoiding 500 error # Delete connection records: await alice_member_client.delete( f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}" diff --git a/docker-compose.yaml b/docker-compose.yaml index 9965ae94b..3e891e40d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -230,7 +230,7 @@ services: env_file: - environments/endorser/endorser.env healthcheck: - test: ["CMD-SHELL", "curl -f http://localhost:3009/health || exit 1"] + test: ["CMD-SHELL", "curl -f http://localhost:3009/health/ready || exit 1"] interval: 30s timeout: 10s retries: 3 diff --git a/endorser/main.py b/endorser/main.py index dac1404b3..d3cd3439d 100644 --- a/endorser/main.py +++ b/endorser/main.py @@ -1,3 +1,4 @@ +import asyncio import os from contextlib import asynccontextmanager @@ -58,7 +59,7 @@ async def scalar_html(): ) -@app.get("/health") +@app.get("/health/live") @inject async def health_check( endorsement_processor: EndorsementProcessor = Depends( @@ -71,3 +72,33 @@ async def health_check( raise HTTPException( status_code=503, detail="One or more background tasks are not running." ) + + +@app.get("/health/ready") +@inject +async def health_ready( + endorsement_processor: EndorsementProcessor = Depends( + Provide[Container.endorsement_processor] + ), +): + try: + jetstream_status = await asyncio.wait_for( + endorsement_processor.check_jetstream(), timeout=5.0 + ) + except asyncio.TimeoutError: + raise HTTPException( + status_code=503, + detail={"status": "not ready", "error": "JetStream health check timed out"}, + ) + except Exception as e: # pylint: disable=W0718 + raise HTTPException( + status_code=500, detail={"status": "error", "error": str(e)} + ) + + if jetstream_status["is_working"]: + return {"status": "ready", "jetstream": jetstream_status} + else: + raise HTTPException( + status_code=503, + detail={"status": "not ready", "jetstream": "JetStream not ready"}, + ) diff --git a/endorser/services/endorsement_processor.py b/endorser/services/endorsement_processor.py index 95f8a405a..28d550614 100644 --- a/endorser/services/endorsement_processor.py +++ b/endorser/services/endorsement_processor.py @@ -4,6 +4,7 @@ from aries_cloudcontroller import AcaPyClient from nats.errors import BadSubscriptionError, Error, TimeoutError from nats.js.client import JetStreamContext +from nats.js.errors import FetchTimeoutError from endorser.util.endorsement import accept_endorsement, should_accept_endorsement from shared.constants import ( @@ -85,7 +86,7 @@ async def _process_endorsement_requests(self) -> NoReturn: subscription = await self._subscribe() while True: try: - messages = await subscription.fetch(batch=1, timeout=60) + messages = await subscription.fetch(batch=1, timeout=60, heartbeat=1) for message in messages: message_subject = message.subject message_data = message.data.decode() @@ -103,9 +104,13 @@ async def _process_endorsement_requests(self) -> NoReturn: ) finally: await message.ack() - except TimeoutError: - logger.trace("Timeout fetching messages continuing...") + except FetchTimeoutError: + logger.trace("FetchTimeoutError continuing...") await asyncio.sleep(0.1) + except TimeoutError as e: + logger.warning("Timeout error fetching messages re-subscribing: {}", e) + await subscription.unsubscribe() + subscription = await self._subscribe() except Exception: # pylint: disable=W0718 logger.exception("Unexpected error in endorsement processing loop") await asyncio.sleep(2) @@ -192,3 +197,17 @@ async def _subscribe(self) -> JetStreamContext.PullSubscription: logger.debug("Subscribed to NATS subject") return subscription + + async def check_jetstream(self): + try: + account_info = await self.jetstream.account_info() + is_working = account_info.streams > 0 + logger.trace("JetStream check completed. Is working: {}", is_working) + return { + "is_working": is_working, + "streams_count": account_info.streams, + "consumers_count": account_info.consumers, + } + except Exception: # pylint: disable=W0718 + logger.exception("Caught exception while checking jetstream status") + return {"is_working": False} diff --git a/endorser/tests/test_endorser_processor.py b/endorser/tests/test_endorser_processor.py index 999ad6670..d56dbd9f3 100644 --- a/endorser/tests/test_endorser_processor.py +++ b/endorser/tests/test_endorser_processor.py @@ -6,6 +6,7 @@ from nats.aio.client import Client as NATS from nats.errors import BadSubscriptionError, Error, TimeoutError from nats.js.client import JetStreamContext +from nats.js.errors import FetchTimeoutError from endorser.services.endorsement_processor import EndorsementProcessor from shared.constants import ( @@ -128,7 +129,7 @@ async def test_process_endorsement_requests_timeout( mock_nats_client.pull_subscribe.return_value = mock_subscription # Simulate a timeout, then a CancelledError to stop the loop - mock_subscription.fetch.side_effect = [TimeoutError, asyncio.CancelledError] + mock_subscription.fetch.side_effect = [FetchTimeoutError, asyncio.CancelledError] # Test with patch("asyncio.sleep") as mock_sleep: diff --git a/endorser/tests/test_main.py b/endorser/tests/test_main.py index 3021c8754..003d16325 100644 --- a/endorser/tests/test_main.py +++ b/endorser/tests/test_main.py @@ -1,9 +1,11 @@ +import asyncio from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest from fastapi import FastAPI, HTTPException -from endorser.main import app, app_lifespan, health_check +from endorser.main import app, app_lifespan, health_check, health_ready +from endorser.services.endorsement_processor import EndorsementProcessor def test_create_app(): @@ -14,8 +16,9 @@ def test_create_app(): # Get all routes in app routes = [route.path for route in app.routes] - expected_routes = "/health" - assert expected_routes in routes + expected_routes = ["/health/live", "/health/ready", "/docs"] + for route in expected_routes: + assert route in routes @pytest.mark.anyio @@ -63,3 +66,89 @@ async def test_health_check_unhealthy(): await health_check(endorsement_processor=endorsement_processor_mock) assert exc_info.value.status_code == 503 assert exc_info.value.detail == "One or more background tasks are not running." + + +@pytest.mark.anyio +async def test_health_ready_success(): + endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor) + + endorsement_processor_mock.check_jetstream.return_value = { + "is_working": True, + "streams_count": 1, + "consumers_count": 1, + } + + response = await health_ready(endorsement_processor=endorsement_processor_mock) + + assert response == { + "status": "ready", + "jetstream": {"is_working": True, "streams_count": 1, "consumers_count": 1}, + } + + +@pytest.mark.anyio +async def test_health_ready_jetstream_not_working(): + endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor) + + endorsement_processor_mock.check_jetstream.return_value = { + "is_working": False, + "error": "No streams available", + } + + with pytest.raises(HTTPException) as exc_info: + await health_ready(endorsement_processor=endorsement_processor_mock) + + assert exc_info.value.status_code == 503 + assert exc_info.value.detail == { + "status": "not ready", + "jetstream": "JetStream not ready", + } + + +@pytest.mark.anyio +async def test_health_ready_timeout(): + endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor) + + endorsement_processor_mock.check_jetstream.side_effect = asyncio.TimeoutError() + + with pytest.raises(HTTPException) as exc_info: + await health_ready(endorsement_processor=endorsement_processor_mock) + + assert exc_info.value.status_code == 503 + assert exc_info.value.detail == { + "status": "not ready", + "error": "JetStream health check timed out", + } + + +@pytest.mark.anyio +async def test_health_ready_unexpected_error(): + endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor) + + endorsement_processor_mock.check_jetstream.side_effect = Exception( + "Unexpected error" + ) + + with pytest.raises(HTTPException) as exc_info: + await health_ready(endorsement_processor=endorsement_processor_mock) + + assert exc_info.value.status_code == 500 + assert exc_info.value.detail == {"status": "error", "error": "Unexpected error"} + + +@pytest.mark.anyio +async def test_health_ready_with_timeout(): + endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor) + + endorsement_processor_mock.check_jetstream.side_effect = ( + asyncio.TimeoutError + ) # Simulate a slow response + + with pytest.raises(HTTPException) as exc_info: + await health_ready(endorsement_processor=endorsement_processor_mock) + + assert exc_info.value.status_code == 503 + assert exc_info.value.detail == { + "status": "not ready", + "error": "JetStream health check timed out", + }