Skip to content

Commit

Permalink
Fix: apply formatting
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed May 17, 2024
1 parent 0185068 commit 7f29851
Show file tree
Hide file tree
Showing 25 changed files with 916 additions and 612 deletions.
4 changes: 1 addition & 3 deletions src/integration-tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,5 @@ def pytest_collection_modifyitems(config, items):
continue

item.add_marker(
pytest.mark.skip(
reason=f"order = {order}, running {active_wave} only"
)
pytest.mark.skip(reason=f"order = {order}, running {active_wave} only")
)
7 changes: 6 additions & 1 deletion src/integration-tests/test_alarms.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import time

import blazingmq.dev.it.testconstants as tc
from blazingmq.dev.it.fixtures import Cluster, cluster, order, tweak # pylint: disable=unused-import
from blazingmq.dev.it.fixtures import (
Cluster,
cluster,
order,
tweak,
) # pylint: disable=unused-import


@tweak.cluster.queue_operations.consumption_monitor_period_ms(500)
Expand Down
115 changes: 69 additions & 46 deletions src/integration-tests/test_auto_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,34 @@
from blazingmq.dev.it.util import wait_until
from blazingmq.dev.configurator import Configurator


class TestAutoSubscriptions:
"""
This test verifies auto subscription for one or more substreams (apps)
"""
def _start_client(self, broker, uri, name, subscriptions=[]):

def _start_client(self, broker, uri, name, subscriptions=[]):
consumer = broker.create_client(name)
assert (
consumer.open(
uri,
flags=["read"],
consumer_priority=1,
subscriptions=subscriptions,
block=True)
block=True,
)
== Client.e_SUCCESS
)

return consumer

def _verify(self, domain, num):

assert len(self.consumer.list(block=True)) == 0

self.leader.list_messages(domain, tc.TEST_QUEUE, 0, 2)
assert self.leader.outputs_substr(f"Printing {num} message(s)", 1)

def _verify_fanout(self, domain, positiveApps, negativeAppIds, num):

for app in positiveApps:
self._verify_delivery(app, num)

Expand All @@ -57,8 +57,11 @@ def _verify_delivery(self, consumer, num):
assert msgs[0].payload == "123"

@tweak.domain.subscriptions(
[{"appId": "foo", "expression": {"version" : "E_VERSION_1", "text": "x==1"}},
{"appId": "bar", "expression": {"version" : "E_VERSION_1", "text": "x==2"}}])
[
{"appId": "foo", "expression": {"version": "E_VERSION_1", "text": "x==1"}},
{"appId": "bar", "expression": {"version": "E_VERSION_1", "text": "x==2"}},
]
)
def test_auto_subscription_fanout(self, cluster: Cluster):
proxies = cluster.proxy_cycle()

Expand All @@ -82,16 +85,20 @@ def test_auto_subscription_fanout(self, cluster: Cluster):

self.consumer = self._start_client(proxy, tc.URI_FANOUT_SC_FOO, "consumerFoo")

self.consumer_bar = self._start_client(proxy, tc.URI_FANOUT_SC_BAR, "consumerBar")
self.consumer_baz = self._start_client(proxy, tc.URI_FANOUT_SC_BAZ, "consumerBaz")
self.consumer_bar = self._start_client(
proxy, tc.URI_FANOUT_SC_BAR, "consumerBar"
)
self.consumer_baz = self._start_client(
proxy, tc.URI_FANOUT_SC_BAZ, "consumerBaz"
)

assert (
producer.post(
tc.URI_FANOUT_SC,
payload=["123"],
block=True,
wait_ack=True,
messageProperties=[{"name": "x", "value": "2", "type": "E_INT"}]
messageProperties=[{"name": "x", "value": "2", "type": "E_INT"}],
)
== Client.e_SUCCESS
)
Expand All @@ -100,10 +107,8 @@ def test_auto_subscription_fanout(self, cluster: Cluster):

self._verify(tc.DOMAIN_FANOUT_SC, 1)
self._verify_fanout(
tc.DOMAIN_FANOUT_SC,
[self.consumer_bar, self.consumer_baz],
["foo"],
1)
tc.DOMAIN_FANOUT_SC, [self.consumer_bar, self.consumer_baz], ["foo"], 1
)

assert self.consumer.stop_session(block=True) == Client.e_SUCCESS
assert self.consumer_bar.stop_session(block=True) == Client.e_SUCCESS
Expand All @@ -117,17 +122,19 @@ def test_auto_subscription_fanout(self, cluster: Cluster):

self.consumer = self._start_client(proxy, tc.URI_FANOUT_SC_FOO, "consumerFoo")

self.consumer_bar = self._start_client(proxy, tc.URI_FANOUT_SC_BAR, "consumerBar")
self.consumer_baz = self._start_client(proxy, tc.URI_FANOUT_SC_BAZ, "consumerBaz")
self.consumer_bar = self._start_client(
proxy, tc.URI_FANOUT_SC_BAR, "consumerBar"
)
self.consumer_baz = self._start_client(
proxy, tc.URI_FANOUT_SC_BAZ, "consumerBaz"
)

self.leader = cluster.last_known_leader

self._verify(tc.DOMAIN_FANOUT_SC, 1)
self._verify_fanout(
tc.DOMAIN_FANOUT_SC,
[self.consumer_bar, self.consumer_baz],
["foo"],
1)
tc.DOMAIN_FANOUT_SC, [self.consumer_bar, self.consumer_baz], ["foo"], 1
)

self.consumer_bar.confirm(tc.URI_FANOUT_SC_BAR, "*", succeed=True)
self.consumer_baz.confirm(tc.URI_FANOUT_SC_BAZ, "*", succeed=True)
Expand All @@ -137,7 +144,9 @@ def test_auto_subscription_fanout(self, cluster: Cluster):
assert len(self.consumer_bar.list(block=True)) == 0
assert len(self.consumer_baz.list(block=True)) == 0

@tweak.domain.subscriptions([{"appId": "", "expression": {"version" : "E_VERSION_1", "text": "x==1"}}])
@tweak.domain.subscriptions(
[{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}]
)
def test_auto_subscription_priority(self, cluster: Cluster):
"""
Configure the priority queue to evaluate auto subscription negatively.
Expand Down Expand Up @@ -166,7 +175,7 @@ def test_auto_subscription_priority(self, cluster: Cluster):
payload=["123"],
block=True,
wait_ack=True,
messageProperties=[{"name": "x", "value": "2", "type": "E_INT"}]
messageProperties=[{"name": "x", "value": "2", "type": "E_INT"}],
)
== Client.e_SUCCESS
)
Expand All @@ -188,8 +197,11 @@ def test_auto_subscription_priority(self, cluster: Cluster):
self._verify(tc.DOMAIN_PRIORITY_SC, 0)

@tweak.domain.subscriptions(
[{"appId": "foo", "expression": {"version" : "E_VERSION_1", "text": "x==1"}},
{"appId": "bar", "expression": {"version" : "E_VERSION_1", "text": "x > 2"}}])
[
{"appId": "foo", "expression": {"version": "E_VERSION_1", "text": "x==1"}},
{"appId": "bar", "expression": {"version": "E_VERSION_1", "text": "x > 2"}},
]
)
def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster):
"""
Out of the 3 apps, configure two to evaluate auto subscriptions.
Expand All @@ -215,22 +227,26 @@ def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster):
proxy,
tc.URI_FANOUT_SC_FOO,
"consumerFoo",
subscriptions=[{"correlationId": 1, "expression": "x == 2"}])
subscriptions=[{"correlationId": 1, "expression": "x == 2"}],
)

self.consumer_bar = self._start_client(
proxy,
tc.URI_FANOUT_SC_BAR,
"consumerBar",
subscriptions=[{"correlationId": 1, "expression": "x > 3"}])
self.consumer_baz = self._start_client(proxy, tc.URI_FANOUT_SC_BAZ, "consumerBaz")
subscriptions=[{"correlationId": 1, "expression": "x > 3"}],
)
self.consumer_baz = self._start_client(
proxy, tc.URI_FANOUT_SC_BAZ, "consumerBaz"
)

assert (
producer.post(
tc.URI_FANOUT_SC,
payload=["123"],
block=True,
wait_ack=True,
messageProperties=[{"name": "x", "value": "3", "type": "E_INT"}]
messageProperties=[{"name": "x", "value": "3", "type": "E_INT"}],
)
== Client.e_SUCCESS
)
Expand All @@ -248,7 +264,7 @@ def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster):
payload=["123"],
block=True,
wait_ack=True,
messageProperties=[{"name": "x", "value": "4", "type": "E_INT"}]
messageProperties=[{"name": "x", "value": "4", "type": "E_INT"}],
)
== Client.e_SUCCESS
)
Expand All @@ -271,14 +287,18 @@ def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster):
proxy,
tc.URI_FANOUT_SC_FOO,
"consumerFoo",
subscriptions=[{"correlationId": 1, "expression": "x == 2"}])
subscriptions=[{"correlationId": 1, "expression": "x == 2"}],
)

self.consumer_bar = self._start_client(
proxy,
tc.URI_FANOUT_SC_BAR,
"consumerBar",
subscriptions=[{"correlationId": 1, "expression": "x > 2"}])
self.consumer_baz = self._start_client(proxy, tc.URI_FANOUT_SC_BAZ, "consumerBaz")
subscriptions=[{"correlationId": 1, "expression": "x > 2"}],
)
self.consumer_baz = self._start_client(
proxy, tc.URI_FANOUT_SC_BAZ, "consumerBaz"
)

self.leader = cluster.last_known_leader

Expand All @@ -294,7 +314,9 @@ def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster):
assert len(self.consumer_bar.list(block=True)) == 0
assert len(self.consumer_baz.list(block=True)) == 0

@tweak.domain.subscriptions([{"appId": "", "expression": {"version" : "E_VERSION_1", "text": "x==1"}}])
@tweak.domain.subscriptions(
[{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}]
)
def test_auto_subscription_broadcast(self, cluster: Cluster):
"""
Configure the boadcast queue to evaluate auto subscription negatively.
Expand Down Expand Up @@ -322,7 +344,7 @@ def test_auto_subscription_broadcast(self, cluster: Cluster):
payload=["123"],
block=True,
wait_ack=True,
messageProperties=[{"name": "x", "value": "2", "type": "E_INT"}]
messageProperties=[{"name": "x", "value": "2", "type": "E_INT"}],
)
== Client.e_SUCCESS
)
Expand All @@ -333,11 +355,13 @@ def test_auto_subscription_broadcast(self, cluster: Cluster):

assert self.consumer.stop_session(block=True) == Client.e_SUCCESS


@tweak.domain.subscriptions(
[{"appId": "foo", "expression": {"version" : "E_VERSION_1", "text": "x==1"}},
{"appId": "bar", "expression": {"version" : "E_VERSION_1", "text": "x==2"}},
{"appId": "baz", "expression": {"version" : "E_VERSION_1", "text": "x==3"}}])
[
{"appId": "foo", "expression": {"version": "E_VERSION_1", "text": "x==1"}},
{"appId": "bar", "expression": {"version": "E_VERSION_1", "text": "x==2"}},
{"appId": "baz", "expression": {"version": "E_VERSION_1", "text": "x==3"}},
]
)
def test_auto_subscription_fanout_all_negative(self, cluster: Cluster):
"""
Configure all fanout Apps to evaluate auto subscriptions negatively.
Expand All @@ -358,30 +382,29 @@ def test_auto_subscription_fanout_all_negative(self, cluster: Cluster):

self.consumer = self._start_client(proxy, tc.URI_FANOUT_SC_FOO, "consumerFoo")

self.consumer_bar = self._start_client(proxy, tc.URI_FANOUT_SC_BAR, "consumerBar")
self.consumer_baz = self._start_client(proxy, tc.URI_FANOUT_SC_BAZ, "consumerBaz")
self.consumer_bar = self._start_client(
proxy, tc.URI_FANOUT_SC_BAR, "consumerBar"
)
self.consumer_baz = self._start_client(
proxy, tc.URI_FANOUT_SC_BAZ, "consumerBaz"
)

assert (
producer.post(
tc.URI_FANOUT_SC,
payload=["123"],
block=True,
wait_ack=True,
messageProperties=[{"name": "x", "value": "0", "type": "E_INT"}]
messageProperties=[{"name": "x", "value": "0", "type": "E_INT"}],
)
== Client.e_SUCCESS
)

self.leader = cluster.last_known_leader

self._verify(tc.DOMAIN_FANOUT_SC, 0)
self._verify_fanout(
tc.DOMAIN_FANOUT_SC,
[],
["foo", "bar", "baz"],
0)
self._verify_fanout(tc.DOMAIN_FANOUT_SC, [], ["foo", "bar", "baz"], 0)

assert len(self.consumer.list(block=True)) == 0
assert len(self.consumer_bar.list(block=True)) == 0
assert len(self.consumer_baz.list(block=True)) == 0

1 change: 0 additions & 1 deletion src/integration-tests/test_breathing.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ def _verify_max_messages_max_bytes_routing(producer, consumer, other_consumers):


def _verify_priority_routing(producers, consumers, lowPriorityConsumers):

# Verify no messages when we start
for consumer in consumers + lowPriorityConsumers:
try:
Expand Down
6 changes: 5 additions & 1 deletion src/integration-tests/test_broadcast.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from itertools import islice

import blazingmq.dev.it.testconstants as tc
from blazingmq.dev.it.fixtures import Cluster, cluster, order # pylint: disable=unused-import
from blazingmq.dev.it.fixtures import (
Cluster,
cluster,
order,
) # pylint: disable=unused-import
from blazingmq.dev.it.process.client import Client

pytestmark = order(3)
Expand Down
8 changes: 6 additions & 2 deletions src/integration-tests/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
"""

import blazingmq.dev.it.testconstants as tc
from blazingmq.dev.it.fixtures import Cluster, cluster, order # pylint: disable=unused-import
from blazingmq.dev.it.fixtures import (
Cluster,
cluster,
order,
) # pylint: disable=unused-import
from blazingmq.dev.it.util import random_string

pytestmark = order(10)

def test_compression_restart(cluster: Cluster):

def test_compression_restart(cluster: Cluster):
# Start a producer and post a message.
proxies = cluster.proxy_cycle()
producer = next(proxies).create_client("producer")
Expand Down
8 changes: 6 additions & 2 deletions src/integration-tests/test_fanout_priorities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
"""


from blazingmq.dev.it.fixtures import Cluster, cluster, order # pylint: disable=unused-import
from blazingmq.dev.it.fixtures import (
Cluster,
cluster,
order,
) # pylint: disable=unused-import
from blazingmq.dev.it.process.client import Client
from blazingmq.dev.it.util import wait_until

pytestmark = order(4)


def test_fanout_priorities(cluster: Cluster):
# create foo, bar, and baz clients on every node.

Expand Down Expand Up @@ -70,7 +75,6 @@ def test_fanout_priorities(cluster: Cluster):


def _verify_delivery(producers, message, highPriorityQueues, lowPriorityQueues):

for i, producer in enumerate(producers):
# there is one producer on each node
assert (
Expand Down
Loading

0 comments on commit 7f29851

Please sign in to comment.