Skip to content

Commit

Permalink
Bugfix/class test (#6)
Browse files Browse the repository at this point in the history
* Added shutdown timeout option.

* Formatting.

* Added files.

* Added tests

* Github actions setup.

* Bumped poetry version.

* Added pyproject.toml

* Removed deployment environment.

* Removed -m

* Fixed class test to represent current functionality.

* Bumped version.
  • Loading branch information
DustinMoriarty authored Aug 14, 2023
1 parent 5e2a514 commit 0793a96
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "asyncio-signal-bus"
version = "1.1.0"
version = "1.1.1"
description = "Internal application publisher/subscriber bus using asyncio queues."
authors = ["DustinMoriarty <[email protected]>"]
readme = "README.md"
Expand Down
39 changes: 39 additions & 0 deletions tests/test_signal_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,45 @@ async def foo_subscriber(signal: str):
assert expected_output == results


@pytest.mark.asyncio
async def test_round_trip_in_same_class():
result_queue = Queue()

class FooPublisher:
bus = SignalBus()

def __init__(self):
self.foo_subscriber = self.bus.subscriber(topic_name="foo")(
self.foo_subscriber
)
self.foo_publisher = self.bus.publisher(topic_name="foo")(
self.foo_publisher
)

async def foo_publisher(self, arg: str):
print("Publishing message.")
await asyncio.sleep(1)
return f"message:{arg}"

async def foo_subscriber(self, signal: str):
print("Received message.")
await asyncio.sleep(1)
await result_queue.put(signal)

input = ["a", "b", "c"]
expected_output = ["message:a", "message:b", "message:c"]

foo_publisher = FooPublisher()

async with foo_publisher.bus:
await asyncio.gather(*[foo_publisher.foo_publisher(x) for x in input])
results = []
while not result_queue.empty():
results.append(await result_queue.get())
results.sort()
assert expected_output == results


@pytest.mark.asyncio
async def test_chaining():
logging.getLogger().setLevel("DEBUG")
Expand Down

0 comments on commit 0793a96

Please sign in to comment.