From dbfbf98d01fd1e6c4d91e6aeeec40b65df057ec3 Mon Sep 17 00:00:00 2001 From: Craig <3979063+craig8@users.noreply.github.com> Date: Thu, 11 May 2023 09:46:09 -0700 Subject: [PATCH 01/15] Update run-tests.yml Fixed action version --- .github/workflows/run-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index f616d06..8cdc647 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -46,7 +46,7 @@ jobs: - name: Set up Python ${{ matrix.python }} id: setup-python - uses: actions/setup-python@v4.6 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python }} From d70d0c6c4c9b02f6615f62279acd15829814973e Mon Sep 17 00:00:00 2001 From: Craig <3979063+craig8@users.noreply.github.com> Date: Thu, 11 May 2023 17:22:25 +0000 Subject: [PATCH 02/15] Added pytest-timeout to dependencies --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 0742372..246fce7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ anypubsub = "^0.6" grequests = "^0.6.0" volttron = ">=10.0.3a9,<11.0" docker = "^6.0.1" +pytest-timeout = "^2.1.0" [tool.poetry.group.dev.dependencies] # formatting, quality, tests From 5640c2dcbd4d347591f6b2247bee81134cbbaed7 Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Sat, 17 Jun 2023 19:53:48 -0700 Subject: [PATCH 03/15] subscribe by tags initial commit --- tests/subsystems/test_pubsub_subsystem.py | 212 ++++++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100644 tests/subsystems/test_pubsub_subsystem.py diff --git a/tests/subsystems/test_pubsub_subsystem.py b/tests/subsystems/test_pubsub_subsystem.py new file mode 100644 index 0000000..ece4dcb --- /dev/null +++ b/tests/subsystems/test_pubsub_subsystem.py @@ -0,0 +1,212 @@ +from datetime import datetime + +import gevent +import pytest +from mock import MagicMock + +from volttron.client.messaging import headers as headers_mod +from volttron.client.vip.agent import Agent +from volttron.client.vip.agent import PubSub +from volttron.utils import format_timestamp +from volttrontesting.utils import (poll_gevent_sleep, + messages_contains_prefix) + + +class _publish_from_handler_test_agent(Agent): + def __init__(self, **kwargs): + super(_publish_from_handler_test_agent, self).__init__(**kwargs) + self.subscription_results = {} + PubSub.subscribe('pubsub', '') + + @PubSub.subscribe('pubsub', '') + def onmessage(self, peer, sender, bus, topic, headers, message): + self.subscription_results[topic] = {'headers': headers, + 'message': message} + if not topic.startswith("testtopic2/test"): + self.vip.pubsub.publish("pubsub", "testtopic2/test", + headers={"foo": "bar"}, + message="Test message").get(timeout=2.0) + + def setup_callback(self, topic): + self.vip.pubsub.subscribe(peer="pubsub", prefix=topic, + callback=self.onmessage).get(timeout=2.0) + + def reset_results(self): + self.subscription_results = {} + + + +@pytest.mark.pubsub +def test_publish_from_message_handler(volttron_instance): + """ Tests the ability to change a status by sending a different status + code. + + This test also tests that the heartbeat is received. + + :param volttron_instance: + :return: + """ + test_topic = "testtopic1/test" + new_agent1 = volttron_instance.build_agent(identity='test_publish1', + agent_class=_publish_from_handler_test_agent) + + new_agent2 = volttron_instance.build_agent(identity='test_publish2') + + # new_agent1.setup_callback("") + + new_agent2.vip.pubsub.publish("pubsub", test_topic, headers={}, + message="Test message").get() + + poll_gevent_sleep(2, lambda: messages_contains_prefix(test_topic, + new_agent1.subscription_results)) + + assert new_agent1.subscription_results[test_topic][ + "message"] == "Test message" + + +@pytest.mark.pubsub +def test_multi_unsubscribe(volttron_instance): + subscriber_agent = volttron_instance.build_agent() + subscriber_agent.subscription_callback = MagicMock( + callback='subscription_callback') + subscriber_agent.subscription_callback.reset_mock() + + # test unsubscribe all when there are no subscriptions + subscriber_agent.vip.pubsub.unsubscribe("pubsub", prefix=None, + callback=None) + + publisher_agent = volttron_instance.build_agent() + + topic_to_check = "testtopic1/test/foo/bar/one" + test_topic1 = "testtopic1/test/foo/bar" + test_topic2 = "testtopic1/test/foo" + test_topic3 = "testtopic1" + + subscriber_agent.vip.pubsub.subscribe( + peer='pubsub', prefix=test_topic1, + callback=subscriber_agent.subscription_callback) + subscriber_agent.vip.pubsub.subscribe( + peer='pubsub', prefix=test_topic2, + callback=subscriber_agent.subscription_callback) + subscriber_agent.vip.pubsub.subscribe( + peer='pubsub', prefix=test_topic3, + callback=subscriber_agent.subscription_callback) + gevent.sleep(1) + + publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, + message="test message 1") + gevent.sleep(1) + + assert subscriber_agent.subscription_callback.call_count == 3 + subscriber_agent.subscription_callback.reset_mock() + + subscriber_agent.vip.pubsub.unsubscribe(peer='pubsub', + prefix="testtopic1/test/foo/bar", + callback=None) + gevent.sleep(1) + + publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, + message="test message 2") + gevent.sleep(1) + + assert subscriber_agent.subscription_callback.call_count == 2 + subscriber_agent.subscription_callback.reset_mock() + + subscriber_agent.vip.pubsub.unsubscribe("pubsub", prefix=None, + callback=None) + gevent.sleep(1) + + publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, + message="test message 3") + gevent.sleep(1) + + assert subscriber_agent.subscription_callback.call_count == 0 + + +class TestAgent(Agent): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.subscription_results = dict() + self.instance_subscription_results = dict() + + @PubSub.subscribe_by_tags('pubsub', 'devices', all_platforms=True) + def on_match(self, peer, sender, bus, topic, headers, message): + print("on_match") + self.subscription_results[topic] = {'headers': headers, + 'message': message} + + def callback_method(self, peer, sender, bus, topic, headers, message): + self.instance_subscription_results[topic] = {'headers': headers, + 'message': message} + + def reset_results(self): + self.subscription_results = dict() + self.instance_subscription_results = dict() + + +@pytest.fixture(scope="module") +def test_agents(volttron_instance): + pub_agent = volttron_instance.build_agent() + agent = volttron_instance.build_agent(identity="test-agent", agent_class=TestAgent) + yield pub_agent, agent + pub_agent.core.stop() + agent.core.stop() + + +def test_subscribe_by_tags_class_method(volttron_instance, test_agents): + pub_agent, agent = test_agents + try: + # TestAgent subscribes to "devices" tag condition. publish to devices and check agent.subscription_results + all_message = [{'OutsideAirTemperature': 0.5, + 'MixedAirTemperature': 0.2}, + {'OutsideAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'}, + 'MixedAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'} + }] + + # Create timestamp + now = format_timestamp(datetime.utcnow()) + + # now = '2015-12-02T00:00:00' + headers = { + headers_mod.DATE: now, + headers_mod.TIMESTAMP: now + } + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/d1/all", headers, all_message).get(timeout=10) + gevent.sleep(1) + assert agent.subscription_results["devices/campus/b1/d1/all"]["headers"] == headers + assert agent.subscription_results["devices/campus/b1/d1/all"]["message"] == all_message + finally: + agent.reset_results() + + +def test_subscribe_by_tags_instance_method(volttron_instance, test_agents): + pub_agent, agent = test_agents + try: + # Subscribe to subscribe_by_tags instance method and check result + agent.vip.pubsub.subscribe_by_tags('pubsub', "devices/campus/b1", agent.callback_method) + + all_message = [{'OutsideAirTemperature': 0.5, + 'MixedAirTemperature': 0.2}, + {'OutsideAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'}, + 'MixedAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'} + }] + + # Create timestamp + now = format_timestamp(datetime.utcnow()) + + # now = '2015-12-02T00:00:00' + headers = { + headers_mod.DATE: now, + headers_mod.TIMESTAMP: now + } + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/d1/all", headers, all_message).get(timeout=10) + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d1/all", headers, all_message).get(timeout=10) + gevent.sleep(1) + assert agent.subscription_results["devices/campus/b1/d1/all"]["headers"] == headers + assert agent.subscription_results["devices/campus/b1/d1/all"]["message"] == all_message + finally: + agent.reset_results() + From 9da17c8a73fd260386f4e36ccb0b317d6c1fce0a Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Fri, 23 Jun 2023 14:46:22 -0700 Subject: [PATCH 04/15] subscribe by tags updated test cases and fixes based on initial round of testing --- pyproject.toml | 3 +- tests/subsystems/test_pubsub_subsystem.py | 179 ++++++++++++++++------ 2 files changed, 137 insertions(+), 45 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 246fce7..3b6086b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ pytest = "^6.2.5" mock = "^4.0.3" anypubsub = "^0.6" grequests = "^0.6.0" -volttron = ">=10.0.3a9,<11.0" +#volttron = ">=10.0.3a9,<11.0" docker = "^6.0.1" pytest-timeout = "^2.1.0" @@ -39,6 +39,7 @@ mypy = "^0.942" coverage = "^6.3.2" Sphinx = "^4.5.0" sphinx-rtd-theme = "^1.0.0" +volttron = {path = "../volttron-core", develop = true} [tool.yapfignore] ignore_patterns = [ diff --git a/tests/subsystems/test_pubsub_subsystem.py b/tests/subsystems/test_pubsub_subsystem.py index ece4dcb..ac0a03f 100644 --- a/tests/subsystems/test_pubsub_subsystem.py +++ b/tests/subsystems/test_pubsub_subsystem.py @@ -1,8 +1,9 @@ from datetime import datetime +from unittest import mock import gevent import pytest -from mock import MagicMock +from mock import MagicMock, patch from volttron.client.messaging import headers as headers_mod from volttron.client.vip.agent import Agent @@ -130,7 +131,7 @@ def __init__(self, **kwargs): self.subscription_results = dict() self.instance_subscription_results = dict() - @PubSub.subscribe_by_tags('pubsub', 'devices', all_platforms=True) + @PubSub.subscribe_by_tags('pubsub', 'condition_devices', all_platforms=True) def on_match(self, peer, sender, bus, topic, headers, message): print("on_match") self.subscription_results[topic] = {'headers': headers, @@ -147,36 +148,44 @@ def reset_results(self): @pytest.fixture(scope="module") def test_agents(volttron_instance): - pub_agent = volttron_instance.build_agent() - agent = volttron_instance.build_agent(identity="test-agent", agent_class=TestAgent) - yield pub_agent, agent + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["devices/campus/b1/device_test_class_method"] + pub_agent = volttron_instance.build_agent() + agent = volttron_instance.build_agent(identity="test-agent", agent_class=TestAgent) + gevent.sleep(0.5) + yield pub_agent, agent pub_agent.core.stop() agent.core.stop() -def test_subscribe_by_tags_class_method(volttron_instance, test_agents): - pub_agent, agent = test_agents - try: - # TestAgent subscribes to "devices" tag condition. publish to devices and check agent.subscription_results - all_message = [{'OutsideAirTemperature': 0.5, - 'MixedAirTemperature': 0.2}, - {'OutsideAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'}, - 'MixedAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'} - }] - - # Create timestamp - now = format_timestamp(datetime.utcnow()) - - # now = '2015-12-02T00:00:00' - headers = { +all_message = [{'OutsideAirTemperature': 0.5, + 'MixedAirTemperature': 0.2}, + {'OutsideAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'}, + 'MixedAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'} + }] +now = format_timestamp(datetime.utcnow()) +headers = { headers_mod.DATE: now, headers_mod.TIMESTAMP: now } + + +def test_subscribe_by_tags_class_method(volttron_instance, test_agents, mocker): + pub_agent, agent = test_agents + try: + # TestAgent subscribes to "devices/campus/b1/device_test_class_method" tag condition using the + # @Pusbub.subscribe_by_tags decorator + + # publish to devices and check agent.subscription_results # Publish messages - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/d1/all", headers, all_message).get(timeout=10) - gevent.sleep(1) - assert agent.subscription_results["devices/campus/b1/d1/all"]["headers"] == headers - assert agent.subscription_results["devices/campus/b1/d1/all"]["message"] == all_message + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", + headers, all_message).get(timeout=10) + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/d2/all", headers, all_message).get(timeout=10) + gevent.sleep(0.5) + + assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["headers"] == headers + assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["message"] == all_message + assert agent.subscription_results.get("devices/campus/b1/d2/all") is None finally: agent.reset_results() @@ -184,29 +193,111 @@ def test_subscribe_by_tags_class_method(volttron_instance, test_agents): def test_subscribe_by_tags_instance_method(volttron_instance, test_agents): pub_agent, agent = test_agents try: - # Subscribe to subscribe_by_tags instance method and check result - agent.vip.pubsub.subscribe_by_tags('pubsub', "devices/campus/b1", agent.callback_method) + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["devices/campus/b2/d2"] - all_message = [{'OutsideAirTemperature': 0.5, - 'MixedAirTemperature': 0.2}, - {'OutsideAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'}, - 'MixedAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'} - }] + # Subscribe to subscribe_by_tags instance method and check result + agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d2", agent.callback_method) - # Create timestamp - now = format_timestamp(datetime.utcnow()) + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", headers, + all_message).get(timeout=10) + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) + gevent.sleep(0.5) - # now = '2015-12-02T00:00:00' - headers = { - headers_mod.DATE: now, - headers_mod.TIMESTAMP: now - } - # Publish messages - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/d1/all", headers, all_message).get(timeout=10) - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d1/all", headers, all_message).get(timeout=10) - gevent.sleep(1) - assert agent.subscription_results["devices/campus/b1/d1/all"]["headers"] == headers - assert agent.subscription_results["devices/campus/b1/d1/all"]["message"] == all_message + assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["headers"] == headers + assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"][ + "message"] == all_message + + assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message finally: agent.reset_results() + +def test_subscribe_by_tags_refresh_tags(volttron_instance, test_agents): + pub_agent, test_agent = test_agents + agent = volttron_instance.build_agent(identity="test-agent-2", agent_class=TestAgent, tag_refresh_interval=5) + gevent.sleep(0.5) + try: + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["devices/campus/b2/d2"] + + # Subscribe to subscribe_by_tags instance method and check result + agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2", agent.callback_method) + gevent.sleep(0.5) + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) + gevent.sleep(0.5) + + assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message + agent.reset_results() + + # Update tag query result + mock_tag_method.return_value = ["devices/campus/b2/d2", "devices/campus/b2/d3"] + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) + gevent.sleep(0.5) + # refresh shouldn't have happened + assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None + + gevent.sleep(5) + # now refresh should have happened + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) + assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message + finally: + test_agent.reset_results() + agent.core.stop() + +# +# def test_unsubscribe_by_tags(volttron_instance, test_agents): +# pub_agent, agent = test_agents +# try: +# with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: +# mock_tag_method.return_value = ["devices/campus/b2/d3"] +# +# # Subscribe to subscribe_by_tags instance method and check result +# agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", +# agent.callback_method).get(timeout=5) +# # Publish messages +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) +# +# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers +# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message +# agent.reset_results() +# +# # Unsubscribe and check result +# agent.vip.pubsub.unsubscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", +# agent.callback_method).get(timeout=5) +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) +# assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None +# finally: +# agent.reset_results() +# +# +# def test_publish_by_tags(volttron_instance, test_agents): +# pub_agent, agent = test_agents +# try: +# with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: +# mock_tag_method.return_value = ["devices/campus/b2/d4/p1", "devices/campus/b2/d4/p2"] +# +# # Subscribe by prefix +# agent.vip.pubsub.subscribe('pubsub', 'devices/campus/b2/d4/p1', agent.callback_method).get(timeout=5) +# +# agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", +# headers, [75.2, {"units": "F"}]).get(timeout=5) +# +# +# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers +# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message +# agent.reset_results() +# +# # Unsubscribe and check result +# agent.vip.pubsub.unsubscribe_by_tags('pubsub', "condition_devices/campus/b2/d2", agent.callback_method) +# gevent.sleep(0.5) +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) +# gevent.sleep(2) # just to make sure we waited long enough +# assert agent.instance_subscription_results.get("devices/campus/b2/d2/all") is None +# finally: +# agent.reset_results() From 43aae196db5170bc47946e0db7999601df3ab213 Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Mon, 26 Jun 2023 13:40:28 -0700 Subject: [PATCH 05/15] All integration test cases pass --- tests/subsystems/test_pubsub_subsystem.py | 127 ++++++++++++---------- 1 file changed, 70 insertions(+), 57 deletions(-) diff --git a/tests/subsystems/test_pubsub_subsystem.py b/tests/subsystems/test_pubsub_subsystem.py index ac0a03f..6cf73a8 100644 --- a/tests/subsystems/test_pubsub_subsystem.py +++ b/tests/subsystems/test_pubsub_subsystem.py @@ -36,7 +36,6 @@ def reset_results(self): self.subscription_results = {} - @pytest.mark.pubsub def test_publish_from_message_handler(volttron_instance): """ Tests the ability to change a status by sending a different status @@ -131,9 +130,8 @@ def __init__(self, **kwargs): self.subscription_results = dict() self.instance_subscription_results = dict() - @PubSub.subscribe_by_tags('pubsub', 'condition_devices', all_platforms=True) + @PubSub.subscribe_by_tags('pubsub', 'condition_test_class_method') def on_match(self, peer, sender, bus, topic, headers, message): - print("on_match") self.subscription_results[topic] = {'headers': headers, 'message': message} @@ -197,7 +195,8 @@ def test_subscribe_by_tags_instance_method(volttron_instance, test_agents): mock_tag_method.return_value = ["devices/campus/b2/d2"] # Subscribe to subscribe_by_tags instance method and check result - agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d2", agent.callback_method) + agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d2", + agent.callback_method).get(timeout=10) # Publish messages pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", headers, @@ -224,8 +223,8 @@ def test_subscribe_by_tags_refresh_tags(volttron_instance, test_agents): mock_tag_method.return_value = ["devices/campus/b2/d2"] # Subscribe to subscribe_by_tags instance method and check result - agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2", agent.callback_method) - gevent.sleep(0.5) + agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2", + agent.callback_method).get(timeout=5) # Publish messages pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) gevent.sleep(0.5) @@ -250,54 +249,68 @@ def test_subscribe_by_tags_refresh_tags(volttron_instance, test_agents): test_agent.reset_results() agent.core.stop() -# -# def test_unsubscribe_by_tags(volttron_instance, test_agents): -# pub_agent, agent = test_agents -# try: -# with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: -# mock_tag_method.return_value = ["devices/campus/b2/d3"] -# -# # Subscribe to subscribe_by_tags instance method and check result -# agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", -# agent.callback_method).get(timeout=5) -# # Publish messages -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) -# -# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers -# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message -# agent.reset_results() -# -# # Unsubscribe and check result -# agent.vip.pubsub.unsubscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", -# agent.callback_method).get(timeout=5) -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) -# assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None -# finally: -# agent.reset_results() -# -# -# def test_publish_by_tags(volttron_instance, test_agents): -# pub_agent, agent = test_agents -# try: -# with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: -# mock_tag_method.return_value = ["devices/campus/b2/d4/p1", "devices/campus/b2/d4/p2"] -# -# # Subscribe by prefix -# agent.vip.pubsub.subscribe('pubsub', 'devices/campus/b2/d4/p1', agent.callback_method).get(timeout=5) -# -# agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", -# headers, [75.2, {"units": "F"}]).get(timeout=5) -# -# -# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers -# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message -# agent.reset_results() -# -# # Unsubscribe and check result -# agent.vip.pubsub.unsubscribe_by_tags('pubsub', "condition_devices/campus/b2/d2", agent.callback_method) -# gevent.sleep(0.5) -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) -# gevent.sleep(2) # just to make sure we waited long enough -# assert agent.instance_subscription_results.get("devices/campus/b2/d2/all") is None -# finally: -# agent.reset_results() + +def test_unsubscribe_by_tags(volttron_instance, test_agents): + pub_agent, agent = test_agents + try: + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["devices/campus/b2/d3"] + + # Subscribe to subscribe_by_tags instance method and check result + agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", + agent.callback_method).get(timeout=5) + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) + + assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message + agent.reset_results() + + # Unsubscribe and check result + agent.vip.pubsub.unsubscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", + agent.callback_method).get(timeout=5) + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) + gevent.sleep(2) + assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None + finally: + agent.reset_results() + + +def test_publish_by_tags(volttron_instance, test_agents): + pub_agent, agent = test_agents + try: + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["devices/campus/b2/d4/p1"] + + # Subscribe by prefix + agent.vip.pubsub.subscribe('pubsub', 'devices/campus/b2/d4/p1', agent.callback_method).get(timeout=5) + + # publish by tags. should publish to two topics returned by mock method + agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", + headers, [75.2, {"units": "F"}]) + gevent.sleep(0.5) + assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["message"] == [75.2, {"units": "F"}] + agent.reset_results() + + mock_tag_method.return_value = ["devices/campus/b2/d4/p1", "devices/campus/b2/d4/p2"] + + # Unsubscribe and check result + agent.vip.pubsub.subscribe_by_tags('pubsub', "tag_condition_device_d4_points", + agent.callback_method).get(timeout=5) + try: + agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", + headers, [75.2, {"units": "F"}]) + except ValueError as v: + assert v.args[0] == "tag condition tag_condition_device_d4_points matched multiple topics " \ + "(['devices/campus/b2/d4/p1', 'devices/campus/b2/d4/p2']) but " \ + "publish_multiple is set to false" + agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", + headers, [75.2, {"units": "F"}], publish_multiple=True) + gevent.sleep(1) + assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["message"] == [75.2, {"units": "F"}] + assert agent.instance_subscription_results["devices/campus/b2/d4/p2"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d4/p2"]["message"] == [75.2, {"units": "F"}] + finally: + agent.reset_results() From 8cf02c7cccd3889cce06828d7c95944a6ad3d6bf Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Mon, 10 Jul 2023 14:50:29 -0700 Subject: [PATCH 06/15] if peerlist timesout, do not wait for control connection to exit. --- src/volttrontesting/platformwrapper.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/volttrontesting/platformwrapper.py b/src/volttrontesting/platformwrapper.py index a12b606..f814fb6 100644 --- a/src/volttrontesting/platformwrapper.py +++ b/src/volttrontesting/platformwrapper.py @@ -1068,7 +1068,12 @@ def __wait_for_control_connection_to_exit__(self, timeout: int = 10): disconnected = False timer_start = time.time() while not disconnected: - peers = self.dynamic_agent.vip.peerlist().get(timeout=20) + try: + peers = self.dynamic_agent.vip.peerlist().get(timeout=10) + except gevent.Timeout: + self.logit("peerlist call timed out. Exiting loop. " + "Not waiting for control connection to exit.") + break disconnected = CONTROL_CONNECTION not in peers if disconnected: break From 20a8f600b4c4846dc95bf4bbb485344095ff6ea3 Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Mon, 10 Jul 2023 14:51:33 -0700 Subject: [PATCH 07/15] minor --- tests/subsystems/test_pubsub_subsystem.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/subsystems/test_pubsub_subsystem.py b/tests/subsystems/test_pubsub_subsystem.py index 6cf73a8..67ef4f9 100644 --- a/tests/subsystems/test_pubsub_subsystem.py +++ b/tests/subsystems/test_pubsub_subsystem.py @@ -5,6 +5,7 @@ import pytest from mock import MagicMock, patch + from volttron.client.messaging import headers as headers_mod from volttron.client.vip.agent import Agent from volttron.client.vip.agent import PubSub From 9959f0fe164860bf7bd864837b535339967e9a7e Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Tue, 11 Jul 2023 17:04:19 -0700 Subject: [PATCH 08/15] try catch - gevent.time out peerlist call. --- src/volttrontesting/platformwrapper.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/volttrontesting/platformwrapper.py b/src/volttrontesting/platformwrapper.py index a12b606..8007be9 100644 --- a/src/volttrontesting/platformwrapper.py +++ b/src/volttrontesting/platformwrapper.py @@ -1068,7 +1068,12 @@ def __wait_for_control_connection_to_exit__(self, timeout: int = 10): disconnected = False timer_start = time.time() while not disconnected: - peers = self.dynamic_agent.vip.peerlist().get(timeout=20) + try: + peers = self.dynamic_agent.vip.peerlist().get(timeout=10) + except gevent.Timeout: + self.logit("peerlist call timed out. Exiting loop. " + "Not waiting for control connection to exit.") + break disconnected = CONTROL_CONNECTION not in peers if disconnected: break @@ -1351,7 +1356,7 @@ def shutdown_platform(self): return if not self.is_running(): - self.logit(f"Instance not running {self.is_running()} and skip cleanup: {self.skip_cleanup}") + self.logit(f"Instance running {self.is_running()} and skip cleanup: {self.skip_cleanup}") if not self.skip_cleanup: self.__remove_home_directory__() return @@ -1367,6 +1372,10 @@ def shutdown_platform(self): self.remove_all_agents() except gevent.Timeout: self.logit("Timeout getting list of agents") + except RuntimeError as e: + if not self.is_running(): + self.logit("Unable to shutdown agent. instance is already shutdown") + self.logit(f"Error shutting down agent {e}") try: # don't wait indefinitely as shutdown will not throw an error if RMQ is down/has cert errors From 80739fcf735942238eaccafaa8aa317f5f36cddf Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Thu, 20 Jul 2023 14:36:42 -0700 Subject: [PATCH 09/15] integration test for pubsub subsystem. --- tests/subsystems/test_pubsub_subsystem.py | 207 ++++++++++++++++++++-- 1 file changed, 188 insertions(+), 19 deletions(-) diff --git a/tests/subsystems/test_pubsub_subsystem.py b/tests/subsystems/test_pubsub_subsystem.py index 67ef4f9..bb87d6b 100644 --- a/tests/subsystems/test_pubsub_subsystem.py +++ b/tests/subsystems/test_pubsub_subsystem.py @@ -4,7 +4,7 @@ import gevent import pytest from mock import MagicMock, patch - +from pathlib import Path from volttron.client.messaging import headers as headers_mod from volttron.client.vip.agent import Agent @@ -14,9 +14,9 @@ messages_contains_prefix) -class _publish_from_handler_test_agent(Agent): +class PublishFromHandlerTestAgent(Agent): def __init__(self, **kwargs): - super(_publish_from_handler_test_agent, self).__init__(**kwargs) + super(PublishFromHandlerTestAgent, self).__init__(**kwargs) self.subscription_results = {} PubSub.subscribe('pubsub', '') @@ -49,7 +49,7 @@ def test_publish_from_message_handler(volttron_instance): """ test_topic = "testtopic1/test" new_agent1 = volttron_instance.build_agent(identity='test_publish1', - agent_class=_publish_from_handler_test_agent) + agent_class=PublishFromHandlerTestAgent) new_agent2 = volttron_instance.build_agent(identity='test_publish2') @@ -124,14 +124,16 @@ def test_multi_unsubscribe(volttron_instance): assert subscriber_agent.subscription_callback.call_count == 0 -class TestAgent(Agent): +class TestAgentPubsubByTags(Agent): def __init__(self, **kwargs): super().__init__(**kwargs) self.subscription_results = dict() self.instance_subscription_results = dict() - @PubSub.subscribe_by_tags('pubsub', 'condition_test_class_method') + # set topic_source parameter explicitly to empty string as agent is instantiated with + # mock get_topics_by_tag that returns results with the "devices" included in topic str + @PubSub.subscribe_by_tags('pubsub', 'condition_test_class_method', topic_source="") def on_match(self, peer, sender, bus, topic, headers, message): self.subscription_results[topic] = {'headers': headers, 'message': message} @@ -150,7 +152,19 @@ def test_agents(volttron_instance): with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: mock_tag_method.return_value = ["devices/campus/b1/device_test_class_method"] pub_agent = volttron_instance.build_agent() - agent = volttron_instance.build_agent(identity="test-agent", agent_class=TestAgent) + agent = volttron_instance.build_agent(identity="test-agent", agent_class=TestAgentPubsubByTags) + gevent.sleep(0.5) + yield pub_agent, agent + pub_agent.core.stop() + agent.core.stop() + + +@pytest.fixture(scope="module") +def test_agents_tagging(volttron_instance): + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["devices/campus/b1/device_test_class_method"] + pub_agent = volttron_instance.build_agent() + agent = volttron_instance.build_agent(identity="test-agent", agent_class=TestAgentPubsubByTags) gevent.sleep(0.5) yield pub_agent, agent pub_agent.core.stop() @@ -193,9 +207,10 @@ def test_subscribe_by_tags_instance_method(volttron_instance, test_agents): pub_agent, agent = test_agents try: with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: - mock_tag_method.return_value = ["devices/campus/b2/d2"] + mock_tag_method.return_value = ["campus/b2/d2"] # Subscribe to subscribe_by_tags instance method and check result + # test with default topic_source. agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d2", agent.callback_method).get(timeout=10) @@ -217,15 +232,17 @@ def test_subscribe_by_tags_instance_method(volttron_instance, test_agents): def test_subscribe_by_tags_refresh_tags(volttron_instance, test_agents): pub_agent, test_agent = test_agents - agent = volttron_instance.build_agent(identity="test-agent-2", agent_class=TestAgent, tag_refresh_interval=5) + agent = volttron_instance.build_agent(identity="test-agent-2", agent_class=TestAgentPubsubByTags, + tag_refresh_interval=5) gevent.sleep(0.5) try: with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: mock_tag_method.return_value = ["devices/campus/b2/d2"] # Subscribe to subscribe_by_tags instance method and check result + # test with topic_source="" as mock return value has "devices" prefix agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2", - agent.callback_method).get(timeout=5) + agent.callback_method, topic_source="").get(timeout=5) # Publish messages pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) gevent.sleep(0.5) @@ -255,7 +272,7 @@ def test_unsubscribe_by_tags(volttron_instance, test_agents): pub_agent, agent = test_agents try: with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: - mock_tag_method.return_value = ["devices/campus/b2/d3"] + mock_tag_method.return_value = ["campus/b2/d3"] # Subscribe to subscribe_by_tags instance method and check result agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", @@ -281,14 +298,15 @@ def test_publish_by_tags(volttron_instance, test_agents): pub_agent, agent = test_agents try: with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: - mock_tag_method.return_value = ["devices/campus/b2/d4/p1"] + mock_tag_method.return_value = ["campus/b2/d4/p1"] # Subscribe by prefix agent.vip.pubsub.subscribe('pubsub', 'devices/campus/b2/d4/p1', agent.callback_method).get(timeout=5) # publish by tags. should publish to two topics returned by mock method + # mock method is returning WITHOUT "devices" prefix similar to tagging service so use default topic_source agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", - headers, [75.2, {"units": "F"}]) + headers=headers, message=[75.2, {"units": "F"}]) gevent.sleep(0.5) assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["headers"] == headers assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["message"] == [75.2, {"units": "F"}] @@ -297,17 +315,20 @@ def test_publish_by_tags(volttron_instance, test_agents): mock_tag_method.return_value = ["devices/campus/b2/d4/p1", "devices/campus/b2/d4/p2"] # Unsubscribe and check result - agent.vip.pubsub.subscribe_by_tags('pubsub', "tag_condition_device_d4_points", - agent.callback_method).get(timeout=5) + # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix + agent.vip.pubsub.subscribe_by_tags('pubsub', "tag_condition_device_d4_points", agent.callback_method, + topic_source="").get(timeout=5) try: - agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", - headers, [75.2, {"units": "F"}]) + # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix + agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", "", + headers=headers, message=[75.2, {"units": "F"}]) except ValueError as v: assert v.args[0] == "tag condition tag_condition_device_d4_points matched multiple topics " \ "(['devices/campus/b2/d4/p1', 'devices/campus/b2/d4/p2']) but " \ "publish_multiple is set to false" - agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", - headers, [75.2, {"units": "F"}], publish_multiple=True) + # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix + agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", "", + headers=headers, message=[75.2, {"units": "F"}], publish_multiple=True) gevent.sleep(1) assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["headers"] == headers assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["message"] == [75.2, {"units": "F"}] @@ -315,3 +336,151 @@ def test_publish_by_tags(volttron_instance, test_agents): assert agent.instance_subscription_results["devices/campus/b2/d4/p2"]["message"] == [75.2, {"units": "F"}] finally: agent.reset_results() + + +@pytest.fixture(scope="module") +def tagging_agent(volttron_instance): + query_agent = volttron_instance.build_agent() + config = { + "connection": { + "type": "sqlite", + "params": { + "database": volttron_instance.volttron_home + '/test.sqlite' + } + } + } + historian_vip = 'test.historian' + historian_uuid = volttron_instance.install_agent( + vip_identity=historian_vip, + agent_dir="volttron-sqlite-historian", + config_file=config, + start=True) + gevent.sleep(1) + assert volttron_instance.is_agent_running(historian_uuid) + + config = { + "connection": { + "type": "sqlite", + "params": { + "database": volttron_instance.volttron_home + '/test_tagging.sqlite' + } + }, + "historian_vip_identity": "test.historian" + } + tagging_agent_vip = 'test.tagging' + agent_path = Path(__file__).parents[3].joinpath("volttron-sqlite-tagging") + tagging_agent_id = volttron_instance.install_agent(vip_identity=tagging_agent_vip, + agent_dir=agent_path, + config_file=config) + volttron_instance.start_agent(tagging_agent_id) + gevent.sleep(1) + assert volttron_instance.is_agent_running(tagging_agent_id) + + + now = format_timestamp(datetime.utcnow()) + headers = {headers_mod.DATE: now, + headers_mod.TIMESTAMP: now} + to_send = [{'topic': 'devices/campus1/d2/all', 'headers': headers, + 'message': [ + {'p1': 2, 'p2': 2, 'p3': 1, 'p4': 2, 'p5': 2}]}] + query_agent.vip.rpc.call(historian_vip, 'insert', to_send).get( + timeout=10) + to_send = [{'topic': 'devices/campus1/d1/all', 'headers': headers, + 'message': [ + {'p1': 2, 'p2': 2, 'p3': 1, 'p4': 2, 'p5': 2}]}] + query_agent.vip.rpc.call(historian_vip, 'insert', to_send).get( + timeout=10) + + to_send = [{'topic': 'devices/campus2/d1/all', 'headers': headers, + 'message': [ + {'p1': 2, 'p2': 2, 'p3': 1, 'p4': 2, 'p5': 2}]}] + query_agent.vip.rpc.call(historian_vip, 'insert', to_send).get( + timeout=10) + gevent.sleep(2) + + # 2. Add tags to topics and topic_prefix that can be used for queries + query_agent.vip.rpc.call( + tagging_agent_vip, 'add_topic_tags', topic_prefix='campus1', + tags={'campus': True, 'dis': "Test description", + "geoCountry": "US"}).get(timeout=10) + + query_agent.vip.rpc.call( + tagging_agent_vip, 'add_topic_tags', topic_prefix='campus2', + tags={'campus': True, "geoCountry": "UK"}).get(timeout=10) + + query_agent.vip.rpc.call( + tagging_agent_vip, 'add_tags', + tags={ + 'campus.*/d.*/p1': {'point': True, 'maxVal': 15, 'minVal': -1}, + 'campus.*/d.*/p2': {'point': True, 'maxVal': 10, 'minVal': 0, + 'dis': "Test description"}, + 'campus.*/d.*/p3': {'point': True, 'maxVal': 5, 'minVal': 1, + 'dis': "Test description"}, + 'campus.*/d1': {'equip': True, 'elec': True, 'phase': 'p1_1', + 'dis': "Test description"}, + 'campus.*/d2': {'equip': True, 'elec': True, + 'phase': 'p2'}, + 'campus1/d.*': {'campusRef': 'campus1'}, + 'campus2/d.*': {'campusRef': 'campus2'}}).get(timeout=10) + + query_agent.vip.rpc.call(tagging_agent_vip, 'add_topic_tags', + topic_prefix='campus2/d1', + tags={'phase': "p1_2"}).get(timeout=10) + gevent.sleep(2) + + yield tagging_agent_vip, query_agent + + volttron_instance.stop_agent(tagging_agent_id) + volttron_instance.remove_agent(tagging_agent_id) + + +class TestAgentPubsubByTags2(Agent): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.subscription_results = dict() + self.instance_subscription_results = dict() + + def callback_method(self, peer, sender, bus, topic, headers, message): + self.instance_subscription_results[topic] = {'headers': headers, + 'message': message} + + def reset_results(self): + self.subscription_results = dict() + self.instance_subscription_results = dict() + + +def test_subscribe_by_tags_with_sqlite_tagging_agent(volttron_instance, tagging_agent): + tagging_vip, pub_agent = tagging_agent + gevent.sleep(2) + agent = volttron_instance.build_agent(identity="test-agent-2", agent_class=TestAgentPubsubByTags2, + tag_vip_id=tagging_vip) + + try: + result1 = pub_agent.vip.rpc.call( + tagging_vip, 'get_topics_by_tags', + condition='equip AND NOT (phase LIKE "p1.*")').get(timeout=10) + print("Results of AND and OR query with parenthesis: {} ".format( + result1)) + assert len(result1) == 1 + assert result1 == ['campus1/d2'] + + # Subscribe to subscribe_by_tags instance method and check result + result = agent.vip.pubsub.subscribe_by_tags('pubsub', 'equip AND NOT (phase LIKE "p1.*")', + agent.callback_method).get(timeout=20) + + print(f"RESULT of subscribe by tags {result}") + gevent.sleep(1) + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus1/d1/all", headers, + all_message).get(timeout=10) + pub_agent.vip.pubsub.publish('pubsub', "devices/campus1/d2/all", headers, + all_message).get(timeout=10) + gevent.sleep(2) + print(agent.instance_subscription_results) + assert agent.instance_subscription_results["devices/campus1/d2/all"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus1/d2/all"]["message"] == all_message + assert agent.instance_subscription_results.get("devices/campus1/d1/all") is None + finally: + agent.core.stop() + pub_agent.core.stop() From 3672f55b7e8cb27556ec94bedb4f899335f9d91d Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Thu, 20 Jul 2023 14:39:42 -0700 Subject: [PATCH 10/15] fixed volttron dependency --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3b6086b..246fce7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ pytest = "^6.2.5" mock = "^4.0.3" anypubsub = "^0.6" grequests = "^0.6.0" -#volttron = ">=10.0.3a9,<11.0" +volttron = ">=10.0.3a9,<11.0" docker = "^6.0.1" pytest-timeout = "^2.1.0" @@ -39,7 +39,6 @@ mypy = "^0.942" coverage = "^6.3.2" Sphinx = "^4.5.0" sphinx-rtd-theme = "^1.0.0" -volttron = {path = "../volttron-core", develop = true} [tool.yapfignore] ignore_patterns = [ From 421e318e9ddeeaf8c8296a0ce6aec351902a5239 Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Thu, 20 Jul 2023 14:51:10 -0700 Subject: [PATCH 11/15] fixed few deprecation warnings. --- src/volttrontesting/server_mock.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/volttrontesting/server_mock.py b/src/volttrontesting/server_mock.py index 43cc320..7ee87c1 100644 --- a/src/volttrontesting/server_mock.py +++ b/src/volttrontesting/server_mock.py @@ -78,7 +78,7 @@ def __find_lifecycle_methods__(agent_class) -> List[Tuple[LifeCycleMembers, str] for lcm in LifeCycleMembers: # .enum_members().items(): # Search for @Core.receiver('onstart') # handle cases for weird spacing and multiple lines - term = f"@Core.receiver\s*\(\s*['\"]{lcm.value}['\"]\s*\)\s*" + term = r"@Core.receiver\s*\(\s*['\"]" + lcm.value + r"['\"]\s*\)\s*" m = re.search(term, class_source, re.MULTILINE) # find the actual function following this @@ -87,8 +87,8 @@ def __find_lifecycle_methods__(agent_class) -> List[Tuple[LifeCycleMembers, str] subsource = class_source[m.start():] # We know that the receiver is decorated on the function so we know # that it starts with def and ends with - m2 = re.search("def\s+.*:$", subsource, re.MULTILINE) - m3 = re.search("[a-zA-Z_]+[a-zA-Z_0-9]*\(", m2[0], re.MULTILINE) + m2 = re.search(r"def\s+.*:$", subsource, re.MULTILINE) + m3 = re.search(r"[a-zA-Z_]+[a-zA-Z_0-9]*\(", m2[0], re.MULTILINE) # This is the data we truly want so we can look it up on the members # to find an instance of the callable method. function_name = m2[0][m3.start():m3.end() - 1] From 6922e5555428a982a402ea0cd4297afc8538ed26 Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Thu, 27 Jul 2023 14:07:27 -0700 Subject: [PATCH 12/15] updated based on review comments --- tests/subsystems/test_pubsub_subsystem.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/subsystems/test_pubsub_subsystem.py b/tests/subsystems/test_pubsub_subsystem.py index bb87d6b..44f7558 100644 --- a/tests/subsystems/test_pubsub_subsystem.py +++ b/tests/subsystems/test_pubsub_subsystem.py @@ -323,12 +323,12 @@ def test_publish_by_tags(volttron_instance, test_agents): agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", "", headers=headers, message=[75.2, {"units": "F"}]) except ValueError as v: - assert v.args[0] == "tag condition tag_condition_device_d4_points matched multiple topics " \ - "(['devices/campus/b2/d4/p1', 'devices/campus/b2/d4/p2']) but " \ - "publish_multiple is set to false" + + assert v.args[0] == 'tag condition tag_condition_device_d4_points matched 2 topics but ' \ + 'max_publish_count is set to 1' # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", "", - headers=headers, message=[75.2, {"units": "F"}], publish_multiple=True) + headers=headers, message=[75.2, {"units": "F"}], max_publish_count=2) gevent.sleep(1) assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["headers"] == headers assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["message"] == [75.2, {"units": "F"}] From 8cded2443e1563060a52131b67532e690ec91671 Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Fri, 11 Aug 2023 12:35:56 -0700 Subject: [PATCH 13/15] updated test to match api param change --- tests/subsystems/test_pubsub_subsystem.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/subsystems/test_pubsub_subsystem.py b/tests/subsystems/test_pubsub_subsystem.py index 44f7558..62265c6 100644 --- a/tests/subsystems/test_pubsub_subsystem.py +++ b/tests/subsystems/test_pubsub_subsystem.py @@ -320,15 +320,16 @@ def test_publish_by_tags(volttron_instance, test_agents): topic_source="").get(timeout=5) try: # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix - agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", "", - headers=headers, message=[75.2, {"units": "F"}]) + agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", + headers=headers, message=[75.2, {"units": "F"}], topic_source="") except ValueError as v: assert v.args[0] == 'tag condition tag_condition_device_d4_points matched 2 topics but ' \ 'max_publish_count is set to 1' # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix - agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", "", - headers=headers, message=[75.2, {"units": "F"}], max_publish_count=2) + agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", + headers=headers, message=[75.2, {"units": "F"}], max_publish_count=2, + topic_source="") gevent.sleep(1) assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["headers"] == headers assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["message"] == [75.2, {"units": "F"}] From fbb8a4944c8d1f37cb18dc6fd5667221298ce43c Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Fri, 11 Aug 2023 13:00:16 -0700 Subject: [PATCH 14/15] fixed agent package name --- tests/subsystems/test_pubsub_subsystem.py | 447 +++++++++++----------- 1 file changed, 223 insertions(+), 224 deletions(-) diff --git a/tests/subsystems/test_pubsub_subsystem.py b/tests/subsystems/test_pubsub_subsystem.py index 62265c6..a060899 100644 --- a/tests/subsystems/test_pubsub_subsystem.py +++ b/tests/subsystems/test_pubsub_subsystem.py @@ -13,117 +13,117 @@ from volttrontesting.utils import (poll_gevent_sleep, messages_contains_prefix) - -class PublishFromHandlerTestAgent(Agent): - def __init__(self, **kwargs): - super(PublishFromHandlerTestAgent, self).__init__(**kwargs) - self.subscription_results = {} - PubSub.subscribe('pubsub', '') - - @PubSub.subscribe('pubsub', '') - def onmessage(self, peer, sender, bus, topic, headers, message): - self.subscription_results[topic] = {'headers': headers, - 'message': message} - if not topic.startswith("testtopic2/test"): - self.vip.pubsub.publish("pubsub", "testtopic2/test", - headers={"foo": "bar"}, - message="Test message").get(timeout=2.0) - - def setup_callback(self, topic): - self.vip.pubsub.subscribe(peer="pubsub", prefix=topic, - callback=self.onmessage).get(timeout=2.0) - - def reset_results(self): - self.subscription_results = {} - - -@pytest.mark.pubsub -def test_publish_from_message_handler(volttron_instance): - """ Tests the ability to change a status by sending a different status - code. - - This test also tests that the heartbeat is received. - - :param volttron_instance: - :return: - """ - test_topic = "testtopic1/test" - new_agent1 = volttron_instance.build_agent(identity='test_publish1', - agent_class=PublishFromHandlerTestAgent) - - new_agent2 = volttron_instance.build_agent(identity='test_publish2') - - # new_agent1.setup_callback("") - - new_agent2.vip.pubsub.publish("pubsub", test_topic, headers={}, - message="Test message").get() - - poll_gevent_sleep(2, lambda: messages_contains_prefix(test_topic, - new_agent1.subscription_results)) - - assert new_agent1.subscription_results[test_topic][ - "message"] == "Test message" - - -@pytest.mark.pubsub -def test_multi_unsubscribe(volttron_instance): - subscriber_agent = volttron_instance.build_agent() - subscriber_agent.subscription_callback = MagicMock( - callback='subscription_callback') - subscriber_agent.subscription_callback.reset_mock() - - # test unsubscribe all when there are no subscriptions - subscriber_agent.vip.pubsub.unsubscribe("pubsub", prefix=None, - callback=None) - - publisher_agent = volttron_instance.build_agent() - - topic_to_check = "testtopic1/test/foo/bar/one" - test_topic1 = "testtopic1/test/foo/bar" - test_topic2 = "testtopic1/test/foo" - test_topic3 = "testtopic1" - - subscriber_agent.vip.pubsub.subscribe( - peer='pubsub', prefix=test_topic1, - callback=subscriber_agent.subscription_callback) - subscriber_agent.vip.pubsub.subscribe( - peer='pubsub', prefix=test_topic2, - callback=subscriber_agent.subscription_callback) - subscriber_agent.vip.pubsub.subscribe( - peer='pubsub', prefix=test_topic3, - callback=subscriber_agent.subscription_callback) - gevent.sleep(1) - - publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, - message="test message 1") - gevent.sleep(1) - - assert subscriber_agent.subscription_callback.call_count == 3 - subscriber_agent.subscription_callback.reset_mock() - - subscriber_agent.vip.pubsub.unsubscribe(peer='pubsub', - prefix="testtopic1/test/foo/bar", - callback=None) - gevent.sleep(1) - - publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, - message="test message 2") - gevent.sleep(1) - - assert subscriber_agent.subscription_callback.call_count == 2 - subscriber_agent.subscription_callback.reset_mock() - - subscriber_agent.vip.pubsub.unsubscribe("pubsub", prefix=None, - callback=None) - gevent.sleep(1) - - publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, - message="test message 3") - gevent.sleep(1) - - assert subscriber_agent.subscription_callback.call_count == 0 - - +# +# class PublishFromHandlerTestAgent(Agent): +# def __init__(self, **kwargs): +# super(PublishFromHandlerTestAgent, self).__init__(**kwargs) +# self.subscription_results = {} +# PubSub.subscribe('pubsub', '') +# +# @PubSub.subscribe('pubsub', '') +# def onmessage(self, peer, sender, bus, topic, headers, message): +# self.subscription_results[topic] = {'headers': headers, +# 'message': message} +# if not topic.startswith("testtopic2/test"): +# self.vip.pubsub.publish("pubsub", "testtopic2/test", +# headers={"foo": "bar"}, +# message="Test message").get(timeout=2.0) +# +# def setup_callback(self, topic): +# self.vip.pubsub.subscribe(peer="pubsub", prefix=topic, +# callback=self.onmessage).get(timeout=2.0) +# +# def reset_results(self): +# self.subscription_results = {} +# +# +# @pytest.mark.pubsub +# def test_publish_from_message_handler(volttron_instance): +# """ Tests the ability to change a status by sending a different status +# code. +# +# This test also tests that the heartbeat is received. +# +# :param volttron_instance: +# :return: +# """ +# test_topic = "testtopic1/test" +# new_agent1 = volttron_instance.build_agent(identity='test_publish1', +# agent_class=PublishFromHandlerTestAgent) +# +# new_agent2 = volttron_instance.build_agent(identity='test_publish2') +# +# # new_agent1.setup_callback("") +# +# new_agent2.vip.pubsub.publish("pubsub", test_topic, headers={}, +# message="Test message").get() +# +# poll_gevent_sleep(2, lambda: messages_contains_prefix(test_topic, +# new_agent1.subscription_results)) +# +# assert new_agent1.subscription_results[test_topic][ +# "message"] == "Test message" +# +# +# @pytest.mark.pubsub +# def test_multi_unsubscribe(volttron_instance): +# subscriber_agent = volttron_instance.build_agent() +# subscriber_agent.subscription_callback = MagicMock( +# callback='subscription_callback') +# subscriber_agent.subscription_callback.reset_mock() +# +# # test unsubscribe all when there are no subscriptions +# subscriber_agent.vip.pubsub.unsubscribe("pubsub", prefix=None, +# callback=None) +# +# publisher_agent = volttron_instance.build_agent() +# +# topic_to_check = "testtopic1/test/foo/bar/one" +# test_topic1 = "testtopic1/test/foo/bar" +# test_topic2 = "testtopic1/test/foo" +# test_topic3 = "testtopic1" +# +# subscriber_agent.vip.pubsub.subscribe( +# peer='pubsub', prefix=test_topic1, +# callback=subscriber_agent.subscription_callback) +# subscriber_agent.vip.pubsub.subscribe( +# peer='pubsub', prefix=test_topic2, +# callback=subscriber_agent.subscription_callback) +# subscriber_agent.vip.pubsub.subscribe( +# peer='pubsub', prefix=test_topic3, +# callback=subscriber_agent.subscription_callback) +# gevent.sleep(1) +# +# publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, +# message="test message 1") +# gevent.sleep(1) +# +# assert subscriber_agent.subscription_callback.call_count == 3 +# subscriber_agent.subscription_callback.reset_mock() +# +# subscriber_agent.vip.pubsub.unsubscribe(peer='pubsub', +# prefix="testtopic1/test/foo/bar", +# callback=None) +# gevent.sleep(1) +# +# publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, +# message="test message 2") +# gevent.sleep(1) +# +# assert subscriber_agent.subscription_callback.call_count == 2 +# subscriber_agent.subscription_callback.reset_mock() +# +# subscriber_agent.vip.pubsub.unsubscribe("pubsub", prefix=None, +# callback=None) +# gevent.sleep(1) +# +# publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, +# message="test message 3") +# gevent.sleep(1) +# +# assert subscriber_agent.subscription_callback.call_count == 0 +# +# class TestAgentPubsubByTags(Agent): def __init__(self, **kwargs): @@ -182,116 +182,116 @@ def test_agents_tagging(volttron_instance): headers_mod.TIMESTAMP: now } - -def test_subscribe_by_tags_class_method(volttron_instance, test_agents, mocker): - pub_agent, agent = test_agents - try: - # TestAgent subscribes to "devices/campus/b1/device_test_class_method" tag condition using the - # @Pusbub.subscribe_by_tags decorator - - # publish to devices and check agent.subscription_results - # Publish messages - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", - headers, all_message).get(timeout=10) - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/d2/all", headers, all_message).get(timeout=10) - gevent.sleep(0.5) - - assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["headers"] == headers - assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["message"] == all_message - assert agent.subscription_results.get("devices/campus/b1/d2/all") is None - finally: - agent.reset_results() - - -def test_subscribe_by_tags_instance_method(volttron_instance, test_agents): - pub_agent, agent = test_agents - try: - with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: - mock_tag_method.return_value = ["campus/b2/d2"] - - # Subscribe to subscribe_by_tags instance method and check result - # test with default topic_source. - agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d2", - agent.callback_method).get(timeout=10) - - # Publish messages - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", headers, - all_message).get(timeout=10) - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) - gevent.sleep(0.5) - - assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["headers"] == headers - assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"][ - "message"] == all_message - - assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers - assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message - finally: - agent.reset_results() - - -def test_subscribe_by_tags_refresh_tags(volttron_instance, test_agents): - pub_agent, test_agent = test_agents - agent = volttron_instance.build_agent(identity="test-agent-2", agent_class=TestAgentPubsubByTags, - tag_refresh_interval=5) - gevent.sleep(0.5) - try: - with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: - mock_tag_method.return_value = ["devices/campus/b2/d2"] - - # Subscribe to subscribe_by_tags instance method and check result - # test with topic_source="" as mock return value has "devices" prefix - agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2", - agent.callback_method, topic_source="").get(timeout=5) - # Publish messages - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) - gevent.sleep(0.5) - - assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers - assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message - agent.reset_results() - - # Update tag query result - mock_tag_method.return_value = ["devices/campus/b2/d2", "devices/campus/b2/d3"] - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) - gevent.sleep(0.5) - # refresh shouldn't have happened - assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None - - gevent.sleep(5) - # now refresh should have happened - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) - assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers - assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message - finally: - test_agent.reset_results() - agent.core.stop() - - -def test_unsubscribe_by_tags(volttron_instance, test_agents): - pub_agent, agent = test_agents - try: - with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: - mock_tag_method.return_value = ["campus/b2/d3"] - - # Subscribe to subscribe_by_tags instance method and check result - agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", - agent.callback_method).get(timeout=5) - # Publish messages - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) - - assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers - assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message - agent.reset_results() - - # Unsubscribe and check result - agent.vip.pubsub.unsubscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", - agent.callback_method).get(timeout=5) - pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) - gevent.sleep(2) - assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None - finally: - agent.reset_results() +# +# def test_subscribe_by_tags_class_method(volttron_instance, test_agents, mocker): +# pub_agent, agent = test_agents +# try: +# # TestAgent subscribes to "devices/campus/b1/device_test_class_method" tag condition using the +# # @Pusbub.subscribe_by_tags decorator +# +# # publish to devices and check agent.subscription_results +# # Publish messages +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", +# headers, all_message).get(timeout=10) +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/d2/all", headers, all_message).get(timeout=10) +# gevent.sleep(0.5) +# +# assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["headers"] == headers +# assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["message"] == all_message +# assert agent.subscription_results.get("devices/campus/b1/d2/all") is None +# finally: +# agent.reset_results() +# +# +# def test_subscribe_by_tags_instance_method(volttron_instance, test_agents): +# pub_agent, agent = test_agents +# try: +# with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: +# mock_tag_method.return_value = ["campus/b2/d2"] +# +# # Subscribe to subscribe_by_tags instance method and check result +# # test with default topic_source. +# agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d2", +# agent.callback_method).get(timeout=10) +# +# # Publish messages +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", headers, +# all_message).get(timeout=10) +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) +# gevent.sleep(0.5) +# +# assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["headers"] == headers +# assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"][ +# "message"] == all_message +# +# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers +# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message +# finally: +# agent.reset_results() +# +# +# def test_subscribe_by_tags_refresh_tags(volttron_instance, test_agents): +# pub_agent, test_agent = test_agents +# agent = volttron_instance.build_agent(identity="test-agent-2", agent_class=TestAgentPubsubByTags, +# tag_refresh_interval=5) +# gevent.sleep(0.5) +# try: +# with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: +# mock_tag_method.return_value = ["devices/campus/b2/d2"] +# +# # Subscribe to subscribe_by_tags instance method and check result +# # test with topic_source="" as mock return value has "devices" prefix +# agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2", +# agent.callback_method, topic_source="").get(timeout=5) +# # Publish messages +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) +# gevent.sleep(0.5) +# +# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers +# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message +# agent.reset_results() +# +# # Update tag query result +# mock_tag_method.return_value = ["devices/campus/b2/d2", "devices/campus/b2/d3"] +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) +# gevent.sleep(0.5) +# # refresh shouldn't have happened +# assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None +# +# gevent.sleep(5) +# # now refresh should have happened +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) +# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers +# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message +# finally: +# test_agent.reset_results() +# agent.core.stop() +# +# +# def test_unsubscribe_by_tags(volttron_instance, test_agents): +# pub_agent, agent = test_agents +# try: +# with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: +# mock_tag_method.return_value = ["campus/b2/d3"] +# +# # Subscribe to subscribe_by_tags instance method and check result +# agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", +# agent.callback_method).get(timeout=5) +# # Publish messages +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) +# +# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers +# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message +# agent.reset_results() +# +# # Unsubscribe and check result +# agent.vip.pubsub.unsubscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", +# agent.callback_method).get(timeout=5) +# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) +# gevent.sleep(2) +# assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None +# finally: +# agent.reset_results() def test_publish_by_tags(volttron_instance, test_agents): @@ -369,9 +369,8 @@ def tagging_agent(volttron_instance): "historian_vip_identity": "test.historian" } tagging_agent_vip = 'test.tagging' - agent_path = Path(__file__).parents[3].joinpath("volttron-sqlite-tagging") tagging_agent_id = volttron_instance.install_agent(vip_identity=tagging_agent_vip, - agent_dir=agent_path, + agent_dir="volttron-sqlite-tagging", config_file=config) volttron_instance.start_agent(tagging_agent_id) gevent.sleep(1) @@ -477,7 +476,7 @@ def test_subscribe_by_tags_with_sqlite_tagging_agent(volttron_instance, tagging_ all_message).get(timeout=10) pub_agent.vip.pubsub.publish('pubsub', "devices/campus1/d2/all", headers, all_message).get(timeout=10) - gevent.sleep(2) + gevent.sleep(3) print(agent.instance_subscription_results) assert agent.instance_subscription_results["devices/campus1/d2/all"]["headers"] == headers assert agent.instance_subscription_results["devices/campus1/d2/all"]["message"] == all_message From 0bb24fefaaa586c6dccfa66f64624a4fb8148103 Mon Sep 17 00:00:00 2001 From: Chandrika Sivaramakrishnan Date: Fri, 11 Aug 2023 14:03:47 -0700 Subject: [PATCH 15/15] split mocked and non mocked tests into two separate modules --- tests/subsystems/test_pubsub.py | 282 +++++++++++ tests/subsystems/test_pubsub_mock_tagging.py | 225 +++++++++ tests/subsystems/test_pubsub_subsystem.py | 486 ------------------- 3 files changed, 507 insertions(+), 486 deletions(-) create mode 100644 tests/subsystems/test_pubsub.py create mode 100644 tests/subsystems/test_pubsub_mock_tagging.py delete mode 100644 tests/subsystems/test_pubsub_subsystem.py diff --git a/tests/subsystems/test_pubsub.py b/tests/subsystems/test_pubsub.py new file mode 100644 index 0000000..58ca0ee --- /dev/null +++ b/tests/subsystems/test_pubsub.py @@ -0,0 +1,282 @@ +from datetime import datetime + +import gevent +import pytest +from mock import MagicMock, patch +from pathlib import Path + +from volttron.client.messaging import headers as headers_mod +from volttron.client.vip.agent import Agent +from volttron.client.vip.agent import PubSub +from volttron.utils import format_timestamp +from volttrontesting.utils import (poll_gevent_sleep, + messages_contains_prefix) + + +all_message = [{'OutsideAirTemperature': 0.5, + 'MixedAirTemperature': 0.2}, + {'OutsideAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'}, + 'MixedAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'} + }] +now = format_timestamp(datetime.utcnow()) +headers = { + headers_mod.DATE: now, + headers_mod.TIMESTAMP: now + } + +class PublishFromHandlerTestAgent(Agent): + def __init__(self, **kwargs): + super(PublishFromHandlerTestAgent, self).__init__(**kwargs) + self.subscription_results = {} + PubSub.subscribe('pubsub', '') + + @PubSub.subscribe('pubsub', '') + def onmessage(self, peer, sender, bus, topic, headers, message): + self.subscription_results[topic] = {'headers': headers, + 'message': message} + if not topic.startswith("testtopic2/test"): + self.vip.pubsub.publish("pubsub", "testtopic2/test", + headers={"foo": "bar"}, + message="Test message").get(timeout=2.0) + + def setup_callback(self, topic): + self.vip.pubsub.subscribe(peer="pubsub", prefix=topic, + callback=self.onmessage).get(timeout=2.0) + + def reset_results(self): + self.subscription_results = {} + + +@pytest.mark.pubsub +def test_publish_from_message_handler(volttron_instance): + """ Tests the ability to change a status by sending a different status + code. + + This test also tests that the heartbeat is received. + + :param volttron_instance: + :return: + """ + test_topic = "testtopic1/test" + new_agent1 = volttron_instance.build_agent(identity='test_publish1', + agent_class=PublishFromHandlerTestAgent) + + new_agent2 = volttron_instance.build_agent(identity='test_publish2') + + # new_agent1.setup_callback("") + + new_agent2.vip.pubsub.publish("pubsub", test_topic, headers={}, + message="Test message").get() + + poll_gevent_sleep(2, lambda: messages_contains_prefix(test_topic, + new_agent1.subscription_results)) + + assert new_agent1.subscription_results[test_topic][ + "message"] == "Test message" + + +@pytest.mark.pubsub +def test_multi_unsubscribe(volttron_instance): + subscriber_agent = volttron_instance.build_agent() + subscriber_agent.subscription_callback = MagicMock( + callback='subscription_callback') + subscriber_agent.subscription_callback.reset_mock() + + # test unsubscribe all when there are no subscriptions + subscriber_agent.vip.pubsub.unsubscribe("pubsub", prefix=None, + callback=None) + + publisher_agent = volttron_instance.build_agent() + + topic_to_check = "testtopic1/test/foo/bar/one" + test_topic1 = "testtopic1/test/foo/bar" + test_topic2 = "testtopic1/test/foo" + test_topic3 = "testtopic1" + + subscriber_agent.vip.pubsub.subscribe( + peer='pubsub', prefix=test_topic1, + callback=subscriber_agent.subscription_callback) + subscriber_agent.vip.pubsub.subscribe( + peer='pubsub', prefix=test_topic2, + callback=subscriber_agent.subscription_callback) + subscriber_agent.vip.pubsub.subscribe( + peer='pubsub', prefix=test_topic3, + callback=subscriber_agent.subscription_callback) + gevent.sleep(1) + + publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, + message="test message 1") + gevent.sleep(1) + + assert subscriber_agent.subscription_callback.call_count == 3 + subscriber_agent.subscription_callback.reset_mock() + + subscriber_agent.vip.pubsub.unsubscribe(peer='pubsub', + prefix="testtopic1/test/foo/bar", + callback=None) + gevent.sleep(1) + + publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, + message="test message 2") + gevent.sleep(1) + + assert subscriber_agent.subscription_callback.call_count == 2 + subscriber_agent.subscription_callback.reset_mock() + + subscriber_agent.vip.pubsub.unsubscribe("pubsub", prefix=None, + callback=None) + gevent.sleep(1) + + publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, + message="test message 3") + gevent.sleep(1) + + assert subscriber_agent.subscription_callback.call_count == 0 + + + +@pytest.fixture(scope="module") +def tagging_agent(volttron_instance): + query_agent = volttron_instance.build_agent() + config = { + "connection": { + "type": "sqlite", + "params": { + "database": volttron_instance.volttron_home + '/test.sqlite' + } + } + } + historian_vip = 'test.historian' + historian_uuid = volttron_instance.install_agent( + vip_identity=historian_vip, + agent_dir="volttron-sqlite-historian", + config_file=config, + start=True) + gevent.sleep(1) + assert volttron_instance.is_agent_running(historian_uuid) + + config = { + "connection": { + "type": "sqlite", + "params": { + "database": volttron_instance.volttron_home + '/test_tagging.sqlite' + } + }, + "historian_vip_identity": "test.historian" + } + tagging_agent_vip = 'test.tagging' + tagging_agent_id = volttron_instance.install_agent(vip_identity=tagging_agent_vip, + agent_dir="volttron-sqlite-tagging", + config_file=config) + volttron_instance.start_agent(tagging_agent_id) + gevent.sleep(1) + assert volttron_instance.is_agent_running(tagging_agent_id) + + + now = format_timestamp(datetime.utcnow()) + headers = {headers_mod.DATE: now, + headers_mod.TIMESTAMP: now} + to_send = [{'topic': 'devices/campus1/d2/all', 'headers': headers, + 'message': [ + {'p1': 2, 'p2': 2, 'p3': 1, 'p4': 2, 'p5': 2}]}] + query_agent.vip.rpc.call(historian_vip, 'insert', to_send).get( + timeout=10) + to_send = [{'topic': 'devices/campus1/d1/all', 'headers': headers, + 'message': [ + {'p1': 2, 'p2': 2, 'p3': 1, 'p4': 2, 'p5': 2}]}] + query_agent.vip.rpc.call(historian_vip, 'insert', to_send).get( + timeout=10) + + to_send = [{'topic': 'devices/campus2/d1/all', 'headers': headers, + 'message': [ + {'p1': 2, 'p2': 2, 'p3': 1, 'p4': 2, 'p5': 2}]}] + query_agent.vip.rpc.call(historian_vip, 'insert', to_send).get( + timeout=10) + gevent.sleep(2) + + # 2. Add tags to topics and topic_prefix that can be used for queries + query_agent.vip.rpc.call( + tagging_agent_vip, 'add_topic_tags', topic_prefix='campus1', + tags={'campus': True, 'dis': "Test description", + "geoCountry": "US"}).get(timeout=10) + + query_agent.vip.rpc.call( + tagging_agent_vip, 'add_topic_tags', topic_prefix='campus2', + tags={'campus': True, "geoCountry": "UK"}).get(timeout=10) + + query_agent.vip.rpc.call( + tagging_agent_vip, 'add_tags', + tags={ + 'campus.*/d.*/p1': {'point': True, 'maxVal': 15, 'minVal': -1}, + 'campus.*/d.*/p2': {'point': True, 'maxVal': 10, 'minVal': 0, + 'dis': "Test description"}, + 'campus.*/d.*/p3': {'point': True, 'maxVal': 5, 'minVal': 1, + 'dis': "Test description"}, + 'campus.*/d1': {'equip': True, 'elec': True, 'phase': 'p1_1', + 'dis': "Test description"}, + 'campus.*/d2': {'equip': True, 'elec': True, + 'phase': 'p2'}, + 'campus1/d.*': {'campusRef': 'campus1'}, + 'campus2/d.*': {'campusRef': 'campus2'}}).get(timeout=10) + + query_agent.vip.rpc.call(tagging_agent_vip, 'add_topic_tags', + topic_prefix='campus2/d1', + tags={'phase': "p1_2"}).get(timeout=10) + gevent.sleep(2) + + yield tagging_agent_vip, query_agent + + volttron_instance.stop_agent(tagging_agent_id) + volttron_instance.remove_agent(tagging_agent_id) + + +class TestAgentPubsubByTags2(Agent): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.subscription_results = dict() + self.instance_subscription_results = dict() + + def callback_method(self, peer, sender, bus, topic, headers, message): + self.instance_subscription_results[topic] = {'headers': headers, + 'message': message} + + def reset_results(self): + self.subscription_results = dict() + self.instance_subscription_results = dict() + + +def test_subscribe_by_tags_with_sqlite_tagging_agent(volttron_instance, tagging_agent): + tagging_vip, pub_agent = tagging_agent + gevent.sleep(2) + agent = volttron_instance.build_agent(identity="test-agent-2", agent_class=TestAgentPubsubByTags2, + tag_vip_id=tagging_vip) + + try: + result1 = pub_agent.vip.rpc.call( + tagging_vip, 'get_topics_by_tags', + condition='equip AND NOT (phase LIKE "p1.*")').get(timeout=10) + print("Results of AND and OR query with parenthesis: {} ".format( + result1)) + assert len(result1) == 1 + assert result1 == ['campus1/d2'] + + # Subscribe to subscribe_by_tags instance method and check result + result = agent.vip.pubsub.subscribe_by_tags('pubsub', 'equip AND NOT (phase LIKE "p1.*")', + agent.callback_method).get(timeout=20) + + print(f"RESULT of subscribe by tags {result}") + gevent.sleep(1) + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus1/d1/all", headers, + all_message).get(timeout=10) + pub_agent.vip.pubsub.publish('pubsub', "devices/campus1/d2/all", headers, + all_message).get(timeout=10) + gevent.sleep(2) + print(agent.instance_subscription_results) + assert agent.instance_subscription_results["devices/campus1/d2/all"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus1/d2/all"]["message"] == all_message + assert agent.instance_subscription_results.get("devices/campus1/d1/all") is None + finally: + agent.core.stop() + pub_agent.core.stop() diff --git a/tests/subsystems/test_pubsub_mock_tagging.py b/tests/subsystems/test_pubsub_mock_tagging.py new file mode 100644 index 0000000..6a21ab4 --- /dev/null +++ b/tests/subsystems/test_pubsub_mock_tagging.py @@ -0,0 +1,225 @@ +from datetime import datetime +from unittest import mock + +import gevent +import pytest + +from volttron.client.messaging import headers as headers_mod +from volttron.client.vip.agent import Agent +from volttron.client.vip.agent import PubSub +from volttron.utils import format_timestamp + + +class TestAgentPubsubByTags(Agent): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.subscription_results = dict() + self.instance_subscription_results = dict() + + # set topic_source parameter explicitly to empty string as agent is instantiated with + # mock get_topics_by_tag that returns results with the "devices" included in topic str + @PubSub.subscribe_by_tags('pubsub', 'condition_test_class_method', topic_source="") + def on_match(self, peer, sender, bus, topic, headers, message): + self.subscription_results[topic] = {'headers': headers, + 'message': message} + + def callback_method(self, peer, sender, bus, topic, headers, message): + self.instance_subscription_results[topic] = {'headers': headers, + 'message': message} + + def reset_results(self): + self.subscription_results = dict() + self.instance_subscription_results = dict() + + +@pytest.fixture(scope="module") +def test_agents(volttron_instance): + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["devices/campus/b1/device_test_class_method"] + pub_agent = volttron_instance.build_agent() + agent = volttron_instance.build_agent(identity="test-agent", agent_class=TestAgentPubsubByTags) + gevent.sleep(0.5) + yield pub_agent, agent + pub_agent.core.stop() + agent.core.stop() + + +@pytest.fixture(scope="module") +def test_agents_tagging(volttron_instance): + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["devices/campus/b1/device_test_class_method"] + pub_agent = volttron_instance.build_agent() + agent = volttron_instance.build_agent(identity="test-agent", agent_class=TestAgentPubsubByTags) + gevent.sleep(0.5) + yield pub_agent, agent + pub_agent.core.stop() + agent.core.stop() + + +all_message = [{'OutsideAirTemperature': 0.5, + 'MixedAirTemperature': 0.2}, + {'OutsideAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'}, + 'MixedAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'} + }] +now = format_timestamp(datetime.utcnow()) +headers = { + headers_mod.DATE: now, + headers_mod.TIMESTAMP: now + } + + +def test_subscribe_by_tags_class_method(volttron_instance, test_agents, mocker): + pub_agent, agent = test_agents + try: + # TestAgent subscribes to "devices/campus/b1/device_test_class_method" tag condition using the + # @Pusbub.subscribe_by_tags decorator + + # publish to devices and check agent.subscription_results + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", + headers, all_message).get(timeout=10) + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/d2/all", headers, all_message).get(timeout=10) + gevent.sleep(0.5) + + assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["headers"] == headers + assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["message"] == all_message + assert agent.subscription_results.get("devices/campus/b1/d2/all") is None + finally: + agent.reset_results() + + +def test_subscribe_by_tags_instance_method(volttron_instance, test_agents): + pub_agent, agent = test_agents + try: + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["campus/b2/d2"] + + # Subscribe to subscribe_by_tags instance method and check result + # test with default topic_source. + agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d2", + agent.callback_method).get(timeout=10) + + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", headers, + all_message).get(timeout=10) + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) + gevent.sleep(0.5) + + assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["headers"] == headers + assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"][ + "message"] == all_message + + assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message + finally: + agent.reset_results() + + +def test_subscribe_by_tags_refresh_tags(volttron_instance, test_agents): + pub_agent, test_agent = test_agents + agent = volttron_instance.build_agent(identity="test-agent-2", agent_class=TestAgentPubsubByTags, + tag_refresh_interval=5) + gevent.sleep(0.5) + try: + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["devices/campus/b2/d2"] + + # Subscribe to subscribe_by_tags instance method and check result + # test with topic_source="" as mock return value has "devices" prefix + agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2", + agent.callback_method, topic_source="").get(timeout=5) + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) + gevent.sleep(0.5) + + assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message + agent.reset_results() + + # Update tag query result + mock_tag_method.return_value = ["devices/campus/b2/d2", "devices/campus/b2/d3"] + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) + gevent.sleep(0.5) + # refresh shouldn't have happened + assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None + + gevent.sleep(5) + # now refresh should have happened + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) + assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message + finally: + test_agent.reset_results() + agent.core.stop() + + +def test_unsubscribe_by_tags(volttron_instance, test_agents): + pub_agent, agent = test_agents + try: + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["campus/b2/d3"] + + # Subscribe to subscribe_by_tags instance method and check result + agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", + agent.callback_method).get(timeout=5) + # Publish messages + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) + + assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message + agent.reset_results() + + # Unsubscribe and check result + agent.vip.pubsub.unsubscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", + agent.callback_method).get(timeout=5) + pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) + gevent.sleep(2) + assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None + finally: + agent.reset_results() + + +def test_publish_by_tags(volttron_instance, test_agents): + pub_agent, agent = test_agents + try: + with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: + mock_tag_method.return_value = ["campus/b2/d4/p1"] + + # Subscribe by prefix + agent.vip.pubsub.subscribe('pubsub', 'devices/campus/b2/d4/p1', agent.callback_method).get(timeout=5) + + # publish by tags. should publish to two topics returned by mock method + # mock method is returning WITHOUT "devices" prefix similar to tagging service so use default topic_source + agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", + headers=headers, message=[75.2, {"units": "F"}]) + gevent.sleep(0.5) + assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["message"] == [75.2, {"units": "F"}] + agent.reset_results() + + mock_tag_method.return_value = ["devices/campus/b2/d4/p1", "devices/campus/b2/d4/p2"] + + # Unsubscribe and check result + # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix + agent.vip.pubsub.subscribe_by_tags('pubsub', "tag_condition_device_d4_points", agent.callback_method, + topic_source="").get(timeout=5) + try: + # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix + agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", + headers=headers, message=[75.2, {"units": "F"}], topic_source="") + except ValueError as v: + + assert v.args[0] == 'tag condition tag_condition_device_d4_points matched 2 topics but ' \ + 'max_publish_count is set to 1' + # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix + agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", + headers=headers, message=[75.2, {"units": "F"}], max_publish_count=2, + topic_source="") + gevent.sleep(1) + assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["message"] == [75.2, {"units": "F"}] + assert agent.instance_subscription_results["devices/campus/b2/d4/p2"]["headers"] == headers + assert agent.instance_subscription_results["devices/campus/b2/d4/p2"]["message"] == [75.2, {"units": "F"}] + finally: + agent.reset_results() diff --git a/tests/subsystems/test_pubsub_subsystem.py b/tests/subsystems/test_pubsub_subsystem.py deleted file mode 100644 index a060899..0000000 --- a/tests/subsystems/test_pubsub_subsystem.py +++ /dev/null @@ -1,486 +0,0 @@ -from datetime import datetime -from unittest import mock - -import gevent -import pytest -from mock import MagicMock, patch -from pathlib import Path - -from volttron.client.messaging import headers as headers_mod -from volttron.client.vip.agent import Agent -from volttron.client.vip.agent import PubSub -from volttron.utils import format_timestamp -from volttrontesting.utils import (poll_gevent_sleep, - messages_contains_prefix) - -# -# class PublishFromHandlerTestAgent(Agent): -# def __init__(self, **kwargs): -# super(PublishFromHandlerTestAgent, self).__init__(**kwargs) -# self.subscription_results = {} -# PubSub.subscribe('pubsub', '') -# -# @PubSub.subscribe('pubsub', '') -# def onmessage(self, peer, sender, bus, topic, headers, message): -# self.subscription_results[topic] = {'headers': headers, -# 'message': message} -# if not topic.startswith("testtopic2/test"): -# self.vip.pubsub.publish("pubsub", "testtopic2/test", -# headers={"foo": "bar"}, -# message="Test message").get(timeout=2.0) -# -# def setup_callback(self, topic): -# self.vip.pubsub.subscribe(peer="pubsub", prefix=topic, -# callback=self.onmessage).get(timeout=2.0) -# -# def reset_results(self): -# self.subscription_results = {} -# -# -# @pytest.mark.pubsub -# def test_publish_from_message_handler(volttron_instance): -# """ Tests the ability to change a status by sending a different status -# code. -# -# This test also tests that the heartbeat is received. -# -# :param volttron_instance: -# :return: -# """ -# test_topic = "testtopic1/test" -# new_agent1 = volttron_instance.build_agent(identity='test_publish1', -# agent_class=PublishFromHandlerTestAgent) -# -# new_agent2 = volttron_instance.build_agent(identity='test_publish2') -# -# # new_agent1.setup_callback("") -# -# new_agent2.vip.pubsub.publish("pubsub", test_topic, headers={}, -# message="Test message").get() -# -# poll_gevent_sleep(2, lambda: messages_contains_prefix(test_topic, -# new_agent1.subscription_results)) -# -# assert new_agent1.subscription_results[test_topic][ -# "message"] == "Test message" -# -# -# @pytest.mark.pubsub -# def test_multi_unsubscribe(volttron_instance): -# subscriber_agent = volttron_instance.build_agent() -# subscriber_agent.subscription_callback = MagicMock( -# callback='subscription_callback') -# subscriber_agent.subscription_callback.reset_mock() -# -# # test unsubscribe all when there are no subscriptions -# subscriber_agent.vip.pubsub.unsubscribe("pubsub", prefix=None, -# callback=None) -# -# publisher_agent = volttron_instance.build_agent() -# -# topic_to_check = "testtopic1/test/foo/bar/one" -# test_topic1 = "testtopic1/test/foo/bar" -# test_topic2 = "testtopic1/test/foo" -# test_topic3 = "testtopic1" -# -# subscriber_agent.vip.pubsub.subscribe( -# peer='pubsub', prefix=test_topic1, -# callback=subscriber_agent.subscription_callback) -# subscriber_agent.vip.pubsub.subscribe( -# peer='pubsub', prefix=test_topic2, -# callback=subscriber_agent.subscription_callback) -# subscriber_agent.vip.pubsub.subscribe( -# peer='pubsub', prefix=test_topic3, -# callback=subscriber_agent.subscription_callback) -# gevent.sleep(1) -# -# publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, -# message="test message 1") -# gevent.sleep(1) -# -# assert subscriber_agent.subscription_callback.call_count == 3 -# subscriber_agent.subscription_callback.reset_mock() -# -# subscriber_agent.vip.pubsub.unsubscribe(peer='pubsub', -# prefix="testtopic1/test/foo/bar", -# callback=None) -# gevent.sleep(1) -# -# publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, -# message="test message 2") -# gevent.sleep(1) -# -# assert subscriber_agent.subscription_callback.call_count == 2 -# subscriber_agent.subscription_callback.reset_mock() -# -# subscriber_agent.vip.pubsub.unsubscribe("pubsub", prefix=None, -# callback=None) -# gevent.sleep(1) -# -# publisher_agent.vip.pubsub.publish(peer="pubsub", topic=topic_to_check, -# message="test message 3") -# gevent.sleep(1) -# -# assert subscriber_agent.subscription_callback.call_count == 0 -# -# -class TestAgentPubsubByTags(Agent): - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.subscription_results = dict() - self.instance_subscription_results = dict() - - # set topic_source parameter explicitly to empty string as agent is instantiated with - # mock get_topics_by_tag that returns results with the "devices" included in topic str - @PubSub.subscribe_by_tags('pubsub', 'condition_test_class_method', topic_source="") - def on_match(self, peer, sender, bus, topic, headers, message): - self.subscription_results[topic] = {'headers': headers, - 'message': message} - - def callback_method(self, peer, sender, bus, topic, headers, message): - self.instance_subscription_results[topic] = {'headers': headers, - 'message': message} - - def reset_results(self): - self.subscription_results = dict() - self.instance_subscription_results = dict() - - -@pytest.fixture(scope="module") -def test_agents(volttron_instance): - with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: - mock_tag_method.return_value = ["devices/campus/b1/device_test_class_method"] - pub_agent = volttron_instance.build_agent() - agent = volttron_instance.build_agent(identity="test-agent", agent_class=TestAgentPubsubByTags) - gevent.sleep(0.5) - yield pub_agent, agent - pub_agent.core.stop() - agent.core.stop() - - -@pytest.fixture(scope="module") -def test_agents_tagging(volttron_instance): - with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: - mock_tag_method.return_value = ["devices/campus/b1/device_test_class_method"] - pub_agent = volttron_instance.build_agent() - agent = volttron_instance.build_agent(identity="test-agent", agent_class=TestAgentPubsubByTags) - gevent.sleep(0.5) - yield pub_agent, agent - pub_agent.core.stop() - agent.core.stop() - - -all_message = [{'OutsideAirTemperature': 0.5, - 'MixedAirTemperature': 0.2}, - {'OutsideAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'}, - 'MixedAirTemperature': {'units': 'F', 'tz': 'UTC', 'type': 'float'} - }] -now = format_timestamp(datetime.utcnow()) -headers = { - headers_mod.DATE: now, - headers_mod.TIMESTAMP: now - } - -# -# def test_subscribe_by_tags_class_method(volttron_instance, test_agents, mocker): -# pub_agent, agent = test_agents -# try: -# # TestAgent subscribes to "devices/campus/b1/device_test_class_method" tag condition using the -# # @Pusbub.subscribe_by_tags decorator -# -# # publish to devices and check agent.subscription_results -# # Publish messages -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", -# headers, all_message).get(timeout=10) -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/d2/all", headers, all_message).get(timeout=10) -# gevent.sleep(0.5) -# -# assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["headers"] == headers -# assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["message"] == all_message -# assert agent.subscription_results.get("devices/campus/b1/d2/all") is None -# finally: -# agent.reset_results() -# -# -# def test_subscribe_by_tags_instance_method(volttron_instance, test_agents): -# pub_agent, agent = test_agents -# try: -# with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: -# mock_tag_method.return_value = ["campus/b2/d2"] -# -# # Subscribe to subscribe_by_tags instance method and check result -# # test with default topic_source. -# agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d2", -# agent.callback_method).get(timeout=10) -# -# # Publish messages -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b1/device_test_class_method/all", headers, -# all_message).get(timeout=10) -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) -# gevent.sleep(0.5) -# -# assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"]["headers"] == headers -# assert agent.subscription_results["devices/campus/b1/device_test_class_method/all"][ -# "message"] == all_message -# -# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers -# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message -# finally: -# agent.reset_results() -# -# -# def test_subscribe_by_tags_refresh_tags(volttron_instance, test_agents): -# pub_agent, test_agent = test_agents -# agent = volttron_instance.build_agent(identity="test-agent-2", agent_class=TestAgentPubsubByTags, -# tag_refresh_interval=5) -# gevent.sleep(0.5) -# try: -# with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: -# mock_tag_method.return_value = ["devices/campus/b2/d2"] -# -# # Subscribe to subscribe_by_tags instance method and check result -# # test with topic_source="" as mock return value has "devices" prefix -# agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2", -# agent.callback_method, topic_source="").get(timeout=5) -# # Publish messages -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d2/all", headers, all_message).get(timeout=10) -# gevent.sleep(0.5) -# -# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["headers"] == headers -# assert agent.instance_subscription_results["devices/campus/b2/d2/all"]["message"] == all_message -# agent.reset_results() -# -# # Update tag query result -# mock_tag_method.return_value = ["devices/campus/b2/d2", "devices/campus/b2/d3"] -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) -# gevent.sleep(0.5) -# # refresh shouldn't have happened -# assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None -# -# gevent.sleep(5) -# # now refresh should have happened -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) -# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers -# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message -# finally: -# test_agent.reset_results() -# agent.core.stop() -# -# -# def test_unsubscribe_by_tags(volttron_instance, test_agents): -# pub_agent, agent = test_agents -# try: -# with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: -# mock_tag_method.return_value = ["campus/b2/d3"] -# -# # Subscribe to subscribe_by_tags instance method and check result -# agent.vip.pubsub.subscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", -# agent.callback_method).get(timeout=5) -# # Publish messages -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) -# -# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["headers"] == headers -# assert agent.instance_subscription_results["devices/campus/b2/d3/all"]["message"] == all_message -# agent.reset_results() -# -# # Unsubscribe and check result -# agent.vip.pubsub.unsubscribe_by_tags('pubsub', "condition_devices/campus/b2/d3", -# agent.callback_method).get(timeout=5) -# pub_agent.vip.pubsub.publish('pubsub', "devices/campus/b2/d3/all", headers, all_message).get(timeout=10) -# gevent.sleep(2) -# assert agent.instance_subscription_results.get("devices/campus/b2/d3/all") is None -# finally: -# agent.reset_results() - - -def test_publish_by_tags(volttron_instance, test_agents): - pub_agent, agent = test_agents - try: - with mock.patch.object(PubSub, "get_topics_by_tag") as mock_tag_method: - mock_tag_method.return_value = ["campus/b2/d4/p1"] - - # Subscribe by prefix - agent.vip.pubsub.subscribe('pubsub', 'devices/campus/b2/d4/p1', agent.callback_method).get(timeout=5) - - # publish by tags. should publish to two topics returned by mock method - # mock method is returning WITHOUT "devices" prefix similar to tagging service so use default topic_source - agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", - headers=headers, message=[75.2, {"units": "F"}]) - gevent.sleep(0.5) - assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["headers"] == headers - assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["message"] == [75.2, {"units": "F"}] - agent.reset_results() - - mock_tag_method.return_value = ["devices/campus/b2/d4/p1", "devices/campus/b2/d4/p2"] - - # Unsubscribe and check result - # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix - agent.vip.pubsub.subscribe_by_tags('pubsub', "tag_condition_device_d4_points", agent.callback_method, - topic_source="").get(timeout=5) - try: - # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix - agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", - headers=headers, message=[75.2, {"units": "F"}], topic_source="") - except ValueError as v: - - assert v.args[0] == 'tag condition tag_condition_device_d4_points matched 2 topics but ' \ - 'max_publish_count is set to 1' - # explicitly sending topic_source as empty as mock get_topics_by_tag is returning with "devices" prefix - agent.vip.pubsub.publish_by_tags('pubsub', "tag_condition_device_d4_points", - headers=headers, message=[75.2, {"units": "F"}], max_publish_count=2, - topic_source="") - gevent.sleep(1) - assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["headers"] == headers - assert agent.instance_subscription_results["devices/campus/b2/d4/p1"]["message"] == [75.2, {"units": "F"}] - assert agent.instance_subscription_results["devices/campus/b2/d4/p2"]["headers"] == headers - assert agent.instance_subscription_results["devices/campus/b2/d4/p2"]["message"] == [75.2, {"units": "F"}] - finally: - agent.reset_results() - - -@pytest.fixture(scope="module") -def tagging_agent(volttron_instance): - query_agent = volttron_instance.build_agent() - config = { - "connection": { - "type": "sqlite", - "params": { - "database": volttron_instance.volttron_home + '/test.sqlite' - } - } - } - historian_vip = 'test.historian' - historian_uuid = volttron_instance.install_agent( - vip_identity=historian_vip, - agent_dir="volttron-sqlite-historian", - config_file=config, - start=True) - gevent.sleep(1) - assert volttron_instance.is_agent_running(historian_uuid) - - config = { - "connection": { - "type": "sqlite", - "params": { - "database": volttron_instance.volttron_home + '/test_tagging.sqlite' - } - }, - "historian_vip_identity": "test.historian" - } - tagging_agent_vip = 'test.tagging' - tagging_agent_id = volttron_instance.install_agent(vip_identity=tagging_agent_vip, - agent_dir="volttron-sqlite-tagging", - config_file=config) - volttron_instance.start_agent(tagging_agent_id) - gevent.sleep(1) - assert volttron_instance.is_agent_running(tagging_agent_id) - - - now = format_timestamp(datetime.utcnow()) - headers = {headers_mod.DATE: now, - headers_mod.TIMESTAMP: now} - to_send = [{'topic': 'devices/campus1/d2/all', 'headers': headers, - 'message': [ - {'p1': 2, 'p2': 2, 'p3': 1, 'p4': 2, 'p5': 2}]}] - query_agent.vip.rpc.call(historian_vip, 'insert', to_send).get( - timeout=10) - to_send = [{'topic': 'devices/campus1/d1/all', 'headers': headers, - 'message': [ - {'p1': 2, 'p2': 2, 'p3': 1, 'p4': 2, 'p5': 2}]}] - query_agent.vip.rpc.call(historian_vip, 'insert', to_send).get( - timeout=10) - - to_send = [{'topic': 'devices/campus2/d1/all', 'headers': headers, - 'message': [ - {'p1': 2, 'p2': 2, 'p3': 1, 'p4': 2, 'p5': 2}]}] - query_agent.vip.rpc.call(historian_vip, 'insert', to_send).get( - timeout=10) - gevent.sleep(2) - - # 2. Add tags to topics and topic_prefix that can be used for queries - query_agent.vip.rpc.call( - tagging_agent_vip, 'add_topic_tags', topic_prefix='campus1', - tags={'campus': True, 'dis': "Test description", - "geoCountry": "US"}).get(timeout=10) - - query_agent.vip.rpc.call( - tagging_agent_vip, 'add_topic_tags', topic_prefix='campus2', - tags={'campus': True, "geoCountry": "UK"}).get(timeout=10) - - query_agent.vip.rpc.call( - tagging_agent_vip, 'add_tags', - tags={ - 'campus.*/d.*/p1': {'point': True, 'maxVal': 15, 'minVal': -1}, - 'campus.*/d.*/p2': {'point': True, 'maxVal': 10, 'minVal': 0, - 'dis': "Test description"}, - 'campus.*/d.*/p3': {'point': True, 'maxVal': 5, 'minVal': 1, - 'dis': "Test description"}, - 'campus.*/d1': {'equip': True, 'elec': True, 'phase': 'p1_1', - 'dis': "Test description"}, - 'campus.*/d2': {'equip': True, 'elec': True, - 'phase': 'p2'}, - 'campus1/d.*': {'campusRef': 'campus1'}, - 'campus2/d.*': {'campusRef': 'campus2'}}).get(timeout=10) - - query_agent.vip.rpc.call(tagging_agent_vip, 'add_topic_tags', - topic_prefix='campus2/d1', - tags={'phase': "p1_2"}).get(timeout=10) - gevent.sleep(2) - - yield tagging_agent_vip, query_agent - - volttron_instance.stop_agent(tagging_agent_id) - volttron_instance.remove_agent(tagging_agent_id) - - -class TestAgentPubsubByTags2(Agent): - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.subscription_results = dict() - self.instance_subscription_results = dict() - - def callback_method(self, peer, sender, bus, topic, headers, message): - self.instance_subscription_results[topic] = {'headers': headers, - 'message': message} - - def reset_results(self): - self.subscription_results = dict() - self.instance_subscription_results = dict() - - -def test_subscribe_by_tags_with_sqlite_tagging_agent(volttron_instance, tagging_agent): - tagging_vip, pub_agent = tagging_agent - gevent.sleep(2) - agent = volttron_instance.build_agent(identity="test-agent-2", agent_class=TestAgentPubsubByTags2, - tag_vip_id=tagging_vip) - - try: - result1 = pub_agent.vip.rpc.call( - tagging_vip, 'get_topics_by_tags', - condition='equip AND NOT (phase LIKE "p1.*")').get(timeout=10) - print("Results of AND and OR query with parenthesis: {} ".format( - result1)) - assert len(result1) == 1 - assert result1 == ['campus1/d2'] - - # Subscribe to subscribe_by_tags instance method and check result - result = agent.vip.pubsub.subscribe_by_tags('pubsub', 'equip AND NOT (phase LIKE "p1.*")', - agent.callback_method).get(timeout=20) - - print(f"RESULT of subscribe by tags {result}") - gevent.sleep(1) - # Publish messages - pub_agent.vip.pubsub.publish('pubsub', "devices/campus1/d1/all", headers, - all_message).get(timeout=10) - pub_agent.vip.pubsub.publish('pubsub', "devices/campus1/d2/all", headers, - all_message).get(timeout=10) - gevent.sleep(3) - print(agent.instance_subscription_results) - assert agent.instance_subscription_results["devices/campus1/d2/all"]["headers"] == headers - assert agent.instance_subscription_results["devices/campus1/d2/all"]["message"] == all_message - assert agent.instance_subscription_results.get("devices/campus1/d1/all") is None - finally: - agent.core.stop() - pub_agent.core.stop()