diff --git a/.github/workflows/continous-integration.yml b/.github/workflows/continous-integration.yml index 587ad2ad26e4..7a69bbb4e807 100644 --- a/.github/workflows/continous-integration.yml +++ b/.github/workflows/continous-integration.yml @@ -618,10 +618,6 @@ jobs: POSTGRES_PORT: 5432 POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres - RABBITMQ_HOST: localhost - RABBITMQ_PORT: 5672 - RABBITMQ_USER: guest - RABBITMQ_PASSWORD: guest services: redis: @@ -653,12 +649,6 @@ jobs: # mapping container ports to the host - 5432:5432 - rabbitmq: - # see https://github.com/docker-library/healthcheck/blob/master/rabbitmq/docker-healthcheck - image: healthcheck/rabbitmq - ports: - - 5672:5672 - mongodb: image: mongodb/mongodb-community-server:6.0.4-ubuntu2204 options: >- @@ -728,6 +718,94 @@ jobs: if grep 'The lock file is not up to date' .output; then exit 1; fi make prepare-tests-ubuntu + - name: Test Code with Services 🩺 + if: needs.changes.outputs.backend == 'true' + env: + JOBS: 2 + INTEGRATION_TEST_PYTEST_MARKERS: '"(not sequential) and (not broker)"' + PYTHONIOENCODING: "utf-8" + run: | + make test-integration + + broker_integration_test: + name: Run Broker Integration Tests + if: github.ref_type != 'tag' + runs-on: ubuntu-22.04 + timeout-minutes: 60 + needs: [changes] + env: + RABBITMQ_HOST: localhost + RABBITMQ_PORT: 5672 + RABBITMQ_USER: guest + RABBITMQ_PASSWORD: guest + + services: + rabbitmq: + # see https://github.com/docker-library/healthcheck/blob/master/rabbitmq/docker-healthcheck + image: healthcheck/rabbitmq + ports: + - 5672:5672 + + steps: + - name: Checkout git repository 🕝 + if: needs.changes.outputs.backend == 'true' + uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c + + - name: Set up Python ${{ env.DEFAULT_PYTHON_VERSION }} 🐍 + if: needs.changes.outputs.backend == 'true' + uses: actions/setup-python@57ded4d7d5e986d7296eab16560982c6dd7c923b + with: + python-version: ${{ env.DEFAULT_PYTHON_VERSION }} + + - name: Read Poetry Version 🔢 + if: needs.changes.outputs.backend == 'true' + run: | + echo "POETRY_VERSION=$(scripts/poetry-version.sh)" >> $GITHUB_ENV + shell: bash + + - name: Install poetry 🦄 + if: needs.changes.outputs.backend == 'true' + uses: Gr1N/setup-poetry@15821dc8a61bc630db542ae4baf6a7c19a994844 # v8 + with: + poetry-version: ${{ env.POETRY_VERSION }} + + - name: Load Poetry Cached Libraries ⬇ + id: cache-poetry + if: needs.changes.outputs.backend == 'true' + uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 + with: + path: .venv + key: ${{ runner.os }}-poetry-${{ env.POETRY_VERSION }}-${{ env.DEFAULT_PYTHON_VERSION }}-${{ hashFiles('**/poetry.lock') }}-venv-${{ secrets.POETRY_CACHE_VERSION }}-${{ env.pythonLocation }} + + - name: Clear Poetry cache + if: steps.cache-poetry.outputs.cache-hit == 'true' && needs.changes.outputs.backend == 'true' && contains(github.event.pull_request.labels.*.name, 'tools:clear-poetry-cache-unit-tests') + run: rm -r .venv + + # Poetry >= 1.1.0b uses virtualenv to create a virtual environment. + # The virtualenv simply doesn't work on Windows with our setup, + # that's why we use venv to create virtual environment + - name: Create virtual environment + if: (steps.cache-poetry.outputs.cache-hit != 'true' || contains(github.event.pull_request.labels.*.name, 'tools:clear-poetry-cache-unit-tests')) && needs.changes.outputs.backend == 'true' + run: python -m venv create .venv + + - name: Set up virtual environment + if: needs.changes.outputs.backend == 'true' + # Poetry on Windows cannot pick up the virtual environments directory properly, + # and it creates a new one every time the pipeline runs. + # This step solves this problem — it tells poetry to always use `.venv` directory inside + # the project itself, which also makes it easier for us to determine the correct directory + # that needs to be cached. + run: poetry config virtualenvs.in-project true + + - name: Install Dependencies (Linux) 📦 + if: needs.changes.outputs.backend == 'true' + run: | + sudo apt-get -y install libpq-dev + make install-full | tee .output + if grep 'The lock file is not up to date' .output; then exit 1; fi + make prepare-tests-ubuntu + make prepare-spacy + - name: Run kafka and zookeeper containers for integration testing if: needs.changes.outputs.backend == 'true' run: | @@ -737,11 +815,16 @@ jobs: if: needs.changes.outputs.backend == 'true' env: JOBS: 2 - INTEGRATION_TEST_PYTEST_MARKERS: '"not sequential"' + INTEGRATION_TEST_PYTEST_MARKERS: "broker" PYTHONIOENCODING: "utf-8" run: | make test-integration + - name: Stop kafka and zookeeper containers for integration testing + if: needs.changes.outputs.backend == 'true' + run: | + docker-compose -f tests_deployment/docker-compose.kafka.yml down + sequential_integration_test: name: Run Sequential Integration Tests if: github.ref_type != 'tag' @@ -841,11 +924,6 @@ jobs: run: | make test-integration - - name: Stop kafka and zookeeper containers for integration testing - if: needs.changes.outputs.backend == 'true' - run: | - docker-compose -f tests_deployment/docker-compose.kafka.yml down - build_docker_base_images_and_set_env: name: Build Docker base images and setup environment runs-on: ubuntu-22.04 diff --git a/Makefile b/Makefile index 23b5799e1d5f..62367bc17c4f 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ JOBS ?= 1 INTEGRATION_TEST_FOLDER = tests/integration_tests/ -INTEGRATION_TEST_PYTEST_MARKERS ?= "sequential or not sequential" +INTEGRATION_TEST_PYTEST_MARKERS ?= "sequential or broker or ((not sequential) and (not broker))" PLATFORM ?= "linux/amd64" help: diff --git a/changelog/13017.bugfix.md b/changelog/13017.bugfix.md new file mode 100644 index 000000000000..654e62aafa2c --- /dev/null +++ b/changelog/13017.bugfix.md @@ -0,0 +1 @@ +Flush messages when Kafka producer is closed. This is to ensure that all messages in the producer's internal queue are sent to the broker. diff --git a/docs/docs/docker/building-in-docker.mdx b/docs/docs/docker/building-in-docker.mdx index a6027c51f7e2..9866ad95e063 100644 --- a/docs/docs/docker/building-in-docker.mdx +++ b/docs/docs/docker/building-in-docker.mdx @@ -67,7 +67,7 @@ The initial project files should all be there, as well as a `models` directory t :::note If you run into permission errors, it may be because the `rasa/rasa` images run as user `1001` as a best practice, to avoid giving the container `root` permissions. -Hence, all files created by these containers will be owned by user `1001`. See the [Docker documentation](https://docs.docker.com/edge/engine/reference/commandline/run/) +Hence, all files created by these containers will be owned by user `1001`. See the [Docker documentation](https://docs.docker.com/reference/cli/docker/container/run/) if you want to run the containers as a different user. ::: diff --git a/rasa/core/brokers/kafka.py b/rasa/core/brokers/kafka.py index 7183be12746a..66e77c2ca385 100644 --- a/rasa/core/brokers/kafka.py +++ b/rasa/core/brokers/kafka.py @@ -260,9 +260,11 @@ def _publish(self, event: Dict[Text, Any]) -> None: on_delivery=delivery_report, ) - def _close(self) -> None: + async def close(self) -> None: self._cancelled = True self._poll_thread.join() + if self.producer: + self.producer.flush() @rasa.shared.utils.common.lazy_property def rasa_environment(self) -> Optional[Text]: diff --git a/tests/integration_tests/core/brokers/test_kafka.py b/tests/integration_tests/core/brokers/test_kafka.py index 89fcdbb2d7bd..6be6eaa89d48 100644 --- a/tests/integration_tests/core/brokers/test_kafka.py +++ b/tests/integration_tests/core/brokers/test_kafka.py @@ -1,9 +1,12 @@ +import pytest + from rasa.core.brokers.kafka import KafkaEventBroker from pytest import LogCaptureFixture import logging.config -def test_kafka_event_broker_valid(): +@pytest.mark.broker +async def test_kafka_event_broker_valid(): broker = KafkaEventBroker( url="localhost", topic="rasa", @@ -19,11 +22,11 @@ def test_kafka_event_broker_valid(): ) assert broker.producer.poll() == 1 finally: - broker.producer.flush() - broker._close() + await broker.close() -def test_kafka_event_broker_buffer_error_is_handled(caplog: LogCaptureFixture): +@pytest.mark.broker +async def test_kafka_event_broker_buffer_error_is_handled(caplog: LogCaptureFixture): broker = KafkaEventBroker( url="localhost", topic="rasa", @@ -48,5 +51,4 @@ def test_kafka_event_broker_buffer_error_is_handled(caplog: LogCaptureFixture): assert "Queue full" in caplog.text assert broker.producer.poll() == 1 finally: - broker.producer.flush() - broker._close() + await broker.close() diff --git a/tests/integration_tests/core/brokers/test_pika.py b/tests/integration_tests/core/brokers/test_pika.py index eb27f9ba9f09..b514b1f91c09 100644 --- a/tests/integration_tests/core/brokers/test_pika.py +++ b/tests/integration_tests/core/brokers/test_pika.py @@ -16,6 +16,7 @@ ) +@pytest.mark.broker async def test_pika_event_broker_connect(): broker = PikaEventBroker( host=RABBITMQ_HOST, @@ -31,6 +32,7 @@ async def test_pika_event_broker_connect(): await broker.close() +@pytest.mark.broker @pytest.mark.xdist_group("rabbitmq") async def test_pika_event_broker_publish_after_restart( docker_client: docker.DockerClient, @@ -102,6 +104,7 @@ async def test_pika_event_broker_publish_after_restart( rabbitmq_container.remove() +@pytest.mark.broker @pytest.mark.xdist_group("rabbitmq") @pytest.mark.parametrize("host_component", ["localhost", "myuser:mypassword@localhost"]) async def test_pika_event_broker_connect_with_path_and_query_params_in_url( diff --git a/tests/integration_tests/core/test_exporter.py b/tests/integration_tests/core/test_exporter.py new file mode 100644 index 000000000000..2b3e8b83edb4 --- /dev/null +++ b/tests/integration_tests/core/test_exporter.py @@ -0,0 +1,117 @@ +import textwrap +from pathlib import Path +from unittest.mock import Mock + +import pytest + +from pytest import MonkeyPatch + +from rasa.core.brokers.kafka import KafkaEventBroker +from rasa.core.exporter import Exporter +from rasa.core.tracker_store import InMemoryTrackerStore +from rasa.shared.core.domain import Domain +from rasa.shared.core.events import ActionExecuted +from rasa.shared.core.trackers import DialogueStateTracker + + +@pytest.mark.broker +async def test_exporter_publishes_to_kafka_broker_success( + tmp_path: Path, +) -> None: + tracker_store = InMemoryTrackerStore(domain=Domain.empty()) + tracker = DialogueStateTracker.from_events( + "test_export", + [ + ActionExecuted("action_listen"), + ], + ) + + await tracker_store.save(tracker) + + kafka_broker = KafkaEventBroker( + url="localhost", + topic="rasa", + sasl_username="admin", + sasl_password="password", + partition_by_sender=True, + ) + + endpoints_file = tmp_path / "endpoints.yml" + endpoints_file.write_text( + textwrap.dedent( + """ + event_broker: + type: kafka + topic: rasa + url: localhost:9092 + client_id: kafka-python-rasa + partition_by_sender: true + security_protocol: SASL_PLAINTEXT + sasl_username: admin + sasl_password: password + sasl_mechanism: PLAIN + """ + ) + ) + + exporter = Exporter(tracker_store, kafka_broker, str(endpoints_file)) + + published_events = await exporter.publish_events() + assert published_events == 1 + + +@pytest.mark.broker +async def test_exporter_publishes_to_kafka_broker_fail( + tmp_path: Path, + monkeypatch: MonkeyPatch, +) -> None: + tracker_store = InMemoryTrackerStore(domain=Domain.empty()) + tracker = DialogueStateTracker.from_events( + "test_export", + [ + ActionExecuted("action_listen"), + ], + ) + + await tracker_store.save(tracker) + + kafka_broker = KafkaEventBroker( + url="localhost", + topic="rasa", + sasl_username="admin", + sasl_password="password", + partition_by_sender=True, + ) + + endpoints_file = tmp_path / "endpoints.yml" + endpoints_file.write_text( + textwrap.dedent( + """ + event_broker: + type: kafka + topic: rasa + url: localhost:9092 + client_id: kafka-python-rasa + partition_by_sender: true + security_protocol: SASL_PLAINTEXT + sasl_username: admin + sasl_password: password + sasl_mechanism: PLAIN + """ + ) + ) + + exporter = Exporter(tracker_store, kafka_broker, str(endpoints_file)) + + # patch the exporter to raise an exception when publishing events + monkeypatch.setattr(exporter, "publish_events", Mock(side_effect=Exception)) + + with pytest.raises(Exception) as error: + await exporter.publish_events() + assert "Producer terminating with 1 messages" in str(error.value) + assert ( + "still in queue or transit: use flush() to wait for " + "outstanding message delivery" in str(error.value) + ) + # necessary for producer teardown + await kafka_broker.close()