Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ATO-2122] Backport rasa export Kafka bugfix to 3.6.x #13017

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 94 additions & 16 deletions .github/workflows/continous-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -618,10 +618,6 @@ jobs:
POSTGRES_PORT: 5432
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
RABBITMQ_HOST: localhost
RABBITMQ_PORT: 5672
RABBITMQ_USER: guest
RABBITMQ_PASSWORD: guest

services:
redis:
Expand Down Expand Up @@ -653,12 +649,6 @@ jobs:
# mapping container ports to the host
- 5432:5432

rabbitmq:
# see https://github.com/docker-library/healthcheck/blob/master/rabbitmq/docker-healthcheck
image: healthcheck/rabbitmq
ports:
- 5672:5672

mongodb:
image: mongodb/mongodb-community-server:6.0.4-ubuntu2204
options: >-
Expand Down Expand Up @@ -728,6 +718,94 @@ jobs:
if grep 'The lock file is not up to date' .output; then exit 1; fi
make prepare-tests-ubuntu

- name: Test Code with Services 🩺
if: needs.changes.outputs.backend == 'true'
env:
JOBS: 2
INTEGRATION_TEST_PYTEST_MARKERS: '"(not sequential) and (not broker)"'
PYTHONIOENCODING: "utf-8"
run: |
make test-integration

broker_integration_test:
name: Run Broker Integration Tests
if: github.ref_type != 'tag'
runs-on: ubuntu-22.04
timeout-minutes: 60
needs: [changes]
env:
RABBITMQ_HOST: localhost
RABBITMQ_PORT: 5672
RABBITMQ_USER: guest
RABBITMQ_PASSWORD: guest

services:
rabbitmq:
# see https://github.com/docker-library/healthcheck/blob/master/rabbitmq/docker-healthcheck
image: healthcheck/rabbitmq
ports:
- 5672:5672

steps:
- name: Checkout git repository 🕝
if: needs.changes.outputs.backend == 'true'
uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c

- name: Set up Python ${{ env.DEFAULT_PYTHON_VERSION }} 🐍
if: needs.changes.outputs.backend == 'true'
uses: actions/setup-python@57ded4d7d5e986d7296eab16560982c6dd7c923b
with:
python-version: ${{ env.DEFAULT_PYTHON_VERSION }}

- name: Read Poetry Version 🔢
if: needs.changes.outputs.backend == 'true'
run: |
echo "POETRY_VERSION=$(scripts/poetry-version.sh)" >> $GITHUB_ENV
shell: bash

- name: Install poetry 🦄
if: needs.changes.outputs.backend == 'true'
uses: Gr1N/setup-poetry@15821dc8a61bc630db542ae4baf6a7c19a994844 # v8
with:
poetry-version: ${{ env.POETRY_VERSION }}

- name: Load Poetry Cached Libraries ⬇
id: cache-poetry
if: needs.changes.outputs.backend == 'true'
uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8
with:
path: .venv
key: ${{ runner.os }}-poetry-${{ env.POETRY_VERSION }}-${{ env.DEFAULT_PYTHON_VERSION }}-${{ hashFiles('**/poetry.lock') }}-venv-${{ secrets.POETRY_CACHE_VERSION }}-${{ env.pythonLocation }}

- name: Clear Poetry cache
if: steps.cache-poetry.outputs.cache-hit == 'true' && needs.changes.outputs.backend == 'true' && contains(github.event.pull_request.labels.*.name, 'tools:clear-poetry-cache-unit-tests')
run: rm -r .venv

# Poetry >= 1.1.0b uses virtualenv to create a virtual environment.
# The virtualenv simply doesn't work on Windows with our setup,
# that's why we use venv to create virtual environment
- name: Create virtual environment
if: (steps.cache-poetry.outputs.cache-hit != 'true' || contains(github.event.pull_request.labels.*.name, 'tools:clear-poetry-cache-unit-tests')) && needs.changes.outputs.backend == 'true'
run: python -m venv create .venv

- name: Set up virtual environment
if: needs.changes.outputs.backend == 'true'
# Poetry on Windows cannot pick up the virtual environments directory properly,
# and it creates a new one every time the pipeline runs.
# This step solves this problem — it tells poetry to always use `.venv` directory inside
# the project itself, which also makes it easier for us to determine the correct directory
# that needs to be cached.
run: poetry config virtualenvs.in-project true

- name: Install Dependencies (Linux) 📦
if: needs.changes.outputs.backend == 'true'
run: |
sudo apt-get -y install libpq-dev
make install-full | tee .output
if grep 'The lock file is not up to date' .output; then exit 1; fi
make prepare-tests-ubuntu
make prepare-spacy

- name: Run kafka and zookeeper containers for integration testing
if: needs.changes.outputs.backend == 'true'
run: |
Expand All @@ -737,11 +815,16 @@ jobs:
if: needs.changes.outputs.backend == 'true'
env:
JOBS: 2
INTEGRATION_TEST_PYTEST_MARKERS: '"not sequential"'
INTEGRATION_TEST_PYTEST_MARKERS: "broker"
PYTHONIOENCODING: "utf-8"
run: |
make test-integration

- name: Stop kafka and zookeeper containers for integration testing
if: needs.changes.outputs.backend == 'true'
run: |
docker-compose -f tests_deployment/docker-compose.kafka.yml down

sequential_integration_test:
name: Run Sequential Integration Tests
if: github.ref_type != 'tag'
Expand Down Expand Up @@ -841,11 +924,6 @@ jobs:
run: |
make test-integration

- name: Stop kafka and zookeeper containers for integration testing
if: needs.changes.outputs.backend == 'true'
run: |
docker-compose -f tests_deployment/docker-compose.kafka.yml down

build_docker_base_images_and_set_env:
name: Build Docker base images and setup environment
runs-on: ubuntu-22.04
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

JOBS ?= 1
INTEGRATION_TEST_FOLDER = tests/integration_tests/
INTEGRATION_TEST_PYTEST_MARKERS ?= "sequential or not sequential"
INTEGRATION_TEST_PYTEST_MARKERS ?= "sequential or broker or ((not sequential) and (not broker))"
PLATFORM ?= "linux/amd64"

help:
Expand Down
1 change: 1 addition & 0 deletions changelog/13017.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Flush messages when Kafka producer is closed. This is to ensure that all messages in the producer's internal queue are sent to the broker.
2 changes: 1 addition & 1 deletion docs/docs/docker/building-in-docker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ The initial project files should all be there, as well as a `models` directory t
:::note
If you run into permission errors, it may be because the `rasa/rasa` images
run as user `1001` as a best practice, to avoid giving the container `root` permissions.
Hence, all files created by these containers will be owned by user `1001`. See the [Docker documentation](https://docs.docker.com/edge/engine/reference/commandline/run/)
Hence, all files created by these containers will be owned by user `1001`. See the [Docker documentation](https://docs.docker.com/reference/cli/docker/container/run/)
if you want to run the containers as a different user.

:::
Expand Down
4 changes: 3 additions & 1 deletion rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,11 @@ def _publish(self, event: Dict[Text, Any]) -> None:
on_delivery=delivery_report,
)

def _close(self) -> None:
async def close(self) -> None:
self._cancelled = True
self._poll_thread.join()
if self.producer:
self.producer.flush()

@rasa.shared.utils.common.lazy_property
def rasa_environment(self) -> Optional[Text]:
Expand Down
14 changes: 8 additions & 6 deletions tests/integration_tests/core/brokers/test_kafka.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import pytest

from rasa.core.brokers.kafka import KafkaEventBroker
from pytest import LogCaptureFixture
import logging.config


def test_kafka_event_broker_valid():
@pytest.mark.broker
async def test_kafka_event_broker_valid():
broker = KafkaEventBroker(
url="localhost",
topic="rasa",
Expand All @@ -19,11 +22,11 @@ def test_kafka_event_broker_valid():
)
assert broker.producer.poll() == 1
finally:
broker.producer.flush()
broker._close()
await broker.close()


def test_kafka_event_broker_buffer_error_is_handled(caplog: LogCaptureFixture):
@pytest.mark.broker
async def test_kafka_event_broker_buffer_error_is_handled(caplog: LogCaptureFixture):
broker = KafkaEventBroker(
url="localhost",
topic="rasa",
Expand All @@ -48,5 +51,4 @@ def test_kafka_event_broker_buffer_error_is_handled(caplog: LogCaptureFixture):
assert "Queue full" in caplog.text
assert broker.producer.poll() == 1
finally:
broker.producer.flush()
broker._close()
await broker.close()
3 changes: 3 additions & 0 deletions tests/integration_tests/core/brokers/test_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)


@pytest.mark.broker
async def test_pika_event_broker_connect():
broker = PikaEventBroker(
host=RABBITMQ_HOST,
Expand All @@ -31,6 +32,7 @@ async def test_pika_event_broker_connect():
await broker.close()


@pytest.mark.broker
@pytest.mark.xdist_group("rabbitmq")
async def test_pika_event_broker_publish_after_restart(
docker_client: docker.DockerClient,
Expand Down Expand Up @@ -102,6 +104,7 @@ async def test_pika_event_broker_publish_after_restart(
rabbitmq_container.remove()


@pytest.mark.broker
@pytest.mark.xdist_group("rabbitmq")
@pytest.mark.parametrize("host_component", ["localhost", "myuser:mypassword@localhost"])
async def test_pika_event_broker_connect_with_path_and_query_params_in_url(
Expand Down
117 changes: 117 additions & 0 deletions tests/integration_tests/core/test_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import textwrap
from pathlib import Path
from unittest.mock import Mock

import pytest

from pytest import MonkeyPatch

from rasa.core.brokers.kafka import KafkaEventBroker
from rasa.core.exporter import Exporter
from rasa.core.tracker_store import InMemoryTrackerStore
from rasa.shared.core.domain import Domain
from rasa.shared.core.events import ActionExecuted
from rasa.shared.core.trackers import DialogueStateTracker


@pytest.mark.broker
async def test_exporter_publishes_to_kafka_broker_success(
tmp_path: Path,
) -> None:
tracker_store = InMemoryTrackerStore(domain=Domain.empty())
tracker = DialogueStateTracker.from_events(
"test_export",
[
ActionExecuted("action_listen"),
],
)

await tracker_store.save(tracker)

kafka_broker = KafkaEventBroker(
url="localhost",
topic="rasa",
sasl_username="admin",
sasl_password="password",
partition_by_sender=True,
)

endpoints_file = tmp_path / "endpoints.yml"
endpoints_file.write_text(
textwrap.dedent(
"""
event_broker:
type: kafka
topic: rasa
url: localhost:9092
client_id: kafka-python-rasa
partition_by_sender: true
security_protocol: SASL_PLAINTEXT
sasl_username: admin
sasl_password: password
sasl_mechanism: PLAIN
"""
)
)

exporter = Exporter(tracker_store, kafka_broker, str(endpoints_file))

published_events = await exporter.publish_events()
assert published_events == 1


@pytest.mark.broker
async def test_exporter_publishes_to_kafka_broker_fail(
tmp_path: Path,
monkeypatch: MonkeyPatch,
) -> None:
tracker_store = InMemoryTrackerStore(domain=Domain.empty())
tracker = DialogueStateTracker.from_events(
"test_export",
[
ActionExecuted("action_listen"),
],
)

await tracker_store.save(tracker)

kafka_broker = KafkaEventBroker(
url="localhost",
topic="rasa",
sasl_username="admin",
sasl_password="password",
partition_by_sender=True,
)

endpoints_file = tmp_path / "endpoints.yml"
endpoints_file.write_text(
textwrap.dedent(
"""
event_broker:
type: kafka
topic: rasa
url: localhost:9092
client_id: kafka-python-rasa
partition_by_sender: true
security_protocol: SASL_PLAINTEXT
sasl_username: admin
sasl_password: password
sasl_mechanism: PLAIN
"""
)
)

exporter = Exporter(tracker_store, kafka_broker, str(endpoints_file))

# patch the exporter to raise an exception when publishing events
monkeypatch.setattr(exporter, "publish_events", Mock(side_effect=Exception))

with pytest.raises(Exception) as error:
await exporter.publish_events()
assert "Producer terminating with 1 messages" in str(error.value)
assert (
"still in queue or transit: use flush() to wait for "
"outstanding message delivery" in str(error.value)
)
# necessary for producer teardown
await kafka_broker.close()
Loading