From e76fc788735b6d6b52184cddf4dd57fe7f82b771 Mon Sep 17 00:00:00 2001 From: Patrick Deziel Date: Mon, 16 Oct 2023 14:15:24 -0500 Subject: [PATCH] Improve project info --- pyensign/ensign.py | 45 ++++----- pyensign/events.py | 11 +++ pyensign/projects.py | 50 ++++++++++ pyensign/topics.py | 121 +++++++++++++++++++++++ pyensign/utils/topics.py | 20 ---- pyensign/utils/ulid.py | 16 ++++ tests/pyensign/test_connection.py | 80 +++++++++++++++- tests/pyensign/test_ensign.py | 154 +++++++++++++++++++++++++++--- tests/pyensign/test_stream.py | 3 +- 9 files changed, 438 insertions(+), 62 deletions(-) create mode 100644 pyensign/projects.py create mode 100644 pyensign/topics.py create mode 100644 pyensign/utils/ulid.py diff --git a/pyensign/ensign.py b/pyensign/ensign.py index a75487a..d389f1c 100644 --- a/pyensign/ensign.py +++ b/pyensign/ensign.py @@ -2,14 +2,27 @@ import json import inspect from datetime import timedelta +from typing import ( + List, + Optional, + Union, + AsyncGenerator, + Dict, + Any, + Coroutine, + Callable, +) +from ulid import ULID +from pyensign.topics import Topic +from pyensign.projects import Project from pyensign.connection import Client from pyensign.events import from_object from pyensign.status import ServerStatus from pyensign.api.v1beta1.query import format_query -from pyensign.utils.topics import Topic, TopicCache +from pyensign.utils.topics import TopicCache from pyensign.connection import Connection -from pyensign.api.v1beta1 import topic_pb2, query_pb2 +from pyensign.api.v1beta1 import topic_pb2 from pyensign.auth.client import AuthClient from pyensign.enum import ( TopicState, @@ -26,22 +39,6 @@ EnsignTopicNotFoundError, ) -from typing import ( - List, - Tuple, - Optional, - Union, - AsyncGenerator, - Dict, - Any, - Iterable, - Coroutine, - Type, - Callable, -) - -from ulid import ULID - class Ensign: """ @@ -591,11 +588,9 @@ async def set_topic_sharding_strategy( state = await self.client.set_topic_sharding_strategy(id, strategy) return TopicState.convert(state.state) - async def info( - self, topic_ids: List[str] = [] - ) -> Any: # Placeholder for return type + async def info(self, topic_ids: List[str] = []) -> Any: """ - Get aggregated statistics for topics in the project. + Get information about the project and topics in the project. Parameters ---------- @@ -605,8 +600,8 @@ async def info( Returns ------- - api.v1beta1.ProjectInfo - The aggregated statistics for the topics in the project. + projects.Project + The project info. """ if not isinstance(topic_ids, list): @@ -620,7 +615,7 @@ async def info( except ValueError: raise ValueError(f"not parseable as a topic ID: {id}") - return await self.client.info(topics) + return Project.from_info(await self.client.info(topics)) async def status(self) -> ServerStatus: """ diff --git a/pyensign/events.py b/pyensign/events.py index 20458ef..00decdb 100644 --- a/pyensign/events.py +++ b/pyensign/events.py @@ -336,6 +336,17 @@ def __repr__(self): def __str__(self): return "{} v{}".format(self.name, self.semver()) + @classmethod + def convert(cls, pb_val): + """ + Convert a protocol buffer Type from its protocol buffer representation. + """ + type = cls(pb_val.name) + type.major_version = pb_val.major_version + type.minor_version = pb_val.minor_version + type.patch_version = pb_val.patch_version + return type + @total_ordering class EventState(Enum): diff --git a/pyensign/projects.py b/pyensign/projects.py new file mode 100644 index 0000000..ffdb22b --- /dev/null +++ b/pyensign/projects.py @@ -0,0 +1,50 @@ +from pyensign.utils import ulid +from pyensign.topics import Topic + + +class Project(object): + """ + A project is a collection of topics. Similar to a traditional database, if you have + access to the project, you have access to all of its topics. + """ + + def __init__(self, id): + self.id = ulid.parse(id) + self.num_topics = 0 + self.num_readonly_topics = 0 + self.events = 0 + self.duplicates = 0 + self.data_size_bytes = 0 + self.topics = [] + + def __repr__(self): + return "Project(id={})".format(self.id) + + def __str__(self): + s = "Project" + s += "\n\tid={}".format(self.id) + s += "\n\tnum_topics={}".format(self.num_topics) + s += "\n\tnum_readonly_topics={}".format(self.num_readonly_topics) + s += "\n\tevents={}".format(self.events) + s += "\n\tduplicates={}".format(self.duplicates) + s += "\n\tdata_size_bytes={}".format(self.data_size_bytes) + for topic in self.topics: + s += "\n\t" + s += str(topic).replace("\t", "\t\t") + return s + + @classmethod + def from_info(cls, pb_val): + """ + Convert a protocol buffer ProjectInfo into a Project. + """ + + project = cls(pb_val.project_id) + project.num_topics = pb_val.num_topics + project.num_readonly_topics = pb_val.num_readonly_topics + project.events = pb_val.events + project.duplicates = pb_val.duplicates + project.data_size_bytes = pb_val.data_size_bytes + for topic in pb_val.topics: + project.topics.append(Topic.from_info(topic)) + return project diff --git a/pyensign/topics.py b/pyensign/topics.py new file mode 100644 index 0000000..3e14edb --- /dev/null +++ b/pyensign/topics.py @@ -0,0 +1,121 @@ +from pyensign.utils import ulid +from pyensign.events import Type + + +class Topic(object): + """ + A Topic is a stream of ordered events. Topics have a human readable name but also + have a unique ID. + """ + + def __init__(self, id=None, name="", topic_str=""): + if id: + self.id = ulid.parse(id) + else: + self.id = None + self.name = name + self.topic_str = topic_str + self.project_id = None + self.event_offset_id = None + self.events = 0 + self.duplicates = 0 + self.data_size_bytes = 0 + self.types = [] + + def __hash__(self): + if self.id is None: + raise ValueError("cannot hash topic with no ID") + return hash(str(self.id)) + + def __eq__(self, other): + return self.id == other.id + + def __repr__(self): + repr = "Topic(" + if self.id: + repr += "id={}, ".format(self.id) + if self.name: + repr += "name={}, ".format(self.name) + if self.topic_str: + repr += "topic_str={}".format(self.topic_str) + repr += ")" + return repr + + def __str__(self): + s = "Topic" + if self.id: + s += "\n\tid={}".format(self.id) + if self.name: + s += "\n\tname={}".format(self.name) + if self.project_id: + s += "\n\tproject_id={}".format(self.project_id) + if self.event_offset_id: + s += "\n\tevent_offset_id={}".format(self.event_offset_id) + if self.topic_str: + s += "\n\ttopic_str={}".format(self.topic_str) + s += "\n\tevents={}".format(self.events) + s += "\n\tduplicates={}".format(self.duplicates) + s += "\n\tdata_size_bytes={}".format(self.data_size_bytes) + s += "\n\ttypes:{}".format(len(self.types)) + for type in self.types: + s += "\n\t" + s += str(type).replace("\t", "\t\t") + return s + + @classmethod + def from_info(cls, pb_val): + """ + Convert a protocol buffer TopicInfo into a Topic. + """ + + topic = cls(id=pb_val.topic_id) + topic.project_id = ulid.parse(pb_val.project_id) + topic.event_offset_id = pb_val.event_offset_id + topic.events = pb_val.events + topic.duplicates = pb_val.duplicates + topic.data_size_bytes = pb_val.data_size_bytes + for type in pb_val.types: + topic.types.append(EventType.from_info(type)) + return topic + + +class EventType(object): + """ + An EventType represents a type of event that was published to a topic, which + includes the schema type and the MIME type. + """ + + def __init__(self, type, mimetype): + self.type = type + self.mimetype = mimetype + self.events = 0 + self.duplicates = 0 + self.data_size_bytes = 0 + + def __repr__(self): + repr = "EventType(" + repr += "type={}".format(self.type) + repr += ", mimetype={}".format(self.mimetype) + repr += ")" + return repr + + def __str__(self): + s = "EventType" + s += "\n\ttype={}".format(self.type) + s += "\n\tmimetype={}".format(self.mimetype) + s += "\n\tevents={}".format(self.events) + s += "\n\tduplicates={}".format(self.duplicates) + s += "\n\tdata_size_bytes={}".format(self.data_size_bytes) + return s + + @classmethod + def from_info(cls, pb_val): + """ + Convert a protocol buffer EventType into an EventType. + """ + + type = cls(Type.convert(pb_val.type), pb_val.mimetype) + type.events = pb_val.events + type.duplicates = pb_val.duplicates + type.data_size_bytes = pb_val.data_size_bytes + return type diff --git a/pyensign/utils/topics.py b/pyensign/utils/topics.py index b4e789e..54dbc40 100644 --- a/pyensign/utils/topics.py +++ b/pyensign/utils/topics.py @@ -4,26 +4,6 @@ from pyensign.exceptions import CacheMissError -class Topic: - """ - Topics have a user-defined name but are also unique by ULID. This class stores both - representations to make topics easier to work with. - """ - - def __init__(self, id=None, name="", topic_str=""): - self.id = id - self.name = name - self.topic_str = topic_str - - def __hash__(self): - if self.id is None: - raise ValueError("cannot hash topic with no ID") - return hash(str(self.id)) - - def __eq__(self, other): - return self.id == other.id - - class TopicCache(Cache): """ TopicCache extends the functionality of the Cache class to support topic ID parsing. diff --git a/pyensign/utils/ulid.py b/pyensign/utils/ulid.py new file mode 100644 index 0000000..b593c22 --- /dev/null +++ b/pyensign/utils/ulid.py @@ -0,0 +1,16 @@ +from ulid import ULID + + +def parse(id): + """ + Parse a ULID from its string or bytes representation. + """ + + if isinstance(id, ULID): + return id + elif isinstance(id, str): + return ULID.from_str(id) + elif isinstance(id, bytes): + return ULID.from_bytes(id) + else: + raise TypeError("cannot parse ULID from {}".format(type(id))) diff --git a/tests/pyensign/test_connection.py b/tests/pyensign/test_connection.py index 88afbc6..069cb7d 100644 --- a/tests/pyensign/test_connection.py +++ b/tests/pyensign/test_connection.py @@ -10,7 +10,7 @@ from grpc.aio._interceptor import InterceptedUnaryStreamCall from pyensign.events import Event -from pyensign.utils.topics import Topic +from pyensign.topics import Topic from pyensign.connection import Client from pyensign.utils.cache import Cache from pyensign.api.v1beta1 import event_pb2 @@ -19,6 +19,7 @@ from pyensign.auth.client import AuthClient from pyensign.api.v1beta1 import ensign_pb2 from pyensign.api.v1beta1 import ensign_pb2_grpc +from pyensign.mimetype.v1beta1 import mimetype_pb2 from pyensign.exceptions import ( UnknownTopicError, @@ -359,11 +360,69 @@ def SetTopicPolicy(self, request, context): @authorize @user_agent def Info(self, request, context): + project_id = ULID().bytes return ensign_pb2.ProjectInfo( - project_id=ULID().bytes, + project_id=project_id, num_topics=3, num_readonly_topics=1, events=100, + duplicates=10, + data_size_bytes=1024, + topics=[ + topic_pb2.TopicInfo( + topic_id=ULID().bytes, + project_id=project_id, + event_offset_id=ULID().bytes, + events=60, + duplicates=6, + data_size_bytes=512, + types=[ + topic_pb2.EventTypeInfo( + type=event_pb2.Type( + name="message", + major_version=1, + minor_version=2, + patch_version=3, + ), + mimetype=mimetype_pb2.TEXT_PLAIN, + events=60, + duplicates=6, + data_size_bytes=512, + ), + ], + ), + topic_pb2.TopicInfo( + topic_id=ULID().bytes, + project_id=project_id, + event_offset_id=ULID().bytes, + events=40, + duplicates=4, + data_size_bytes=512, + types=[ + topic_pb2.EventTypeInfo( + type=event_pb2.Type( + name="data", + major_version=1, + patch_version=2, + ), + mimetype=mimetype_pb2.APPLICATION_JSON, + events=30, + duplicates=3, + data_size_bytes=256, + ), + topic_pb2.EventTypeInfo( + type=event_pb2.Type( + name="model", + major_version=2, + ), + mimetype=mimetype_pb2.APPLICATION_PYTHON_PICKLE, + events=10, + duplicates=1, + data_size_bytes=256, + ), + ], + ), + ], ) @user_agent @@ -889,6 +948,23 @@ async def test_info(self, client): assert info.num_topics > 0 assert info.num_readonly_topics > 0 assert info.events > 0 + assert info.duplicates > 0 + assert info.data_size_bytes > 0 + assert len(info.topics) > 0 + for topic in info.topics: + assert ULID.from_bytes(topic.topic_id) is not None + assert ULID.from_bytes(topic.project_id) is not None + assert ULID.from_bytes(topic.event_offset_id) is not None + assert topic.events > 0 + assert topic.duplicates > 0 + assert topic.data_size_bytes > 0 + assert len(topic.types) > 0 + for type in topic.types: + assert len(type.type.name) > 0 + assert type.mimetype > 0 + assert type.events > 0 + assert type.duplicates > 0 + assert type.data_size_bytes > 0 @pytest.mark.asyncio async def test_status(self, client): diff --git a/tests/pyensign/test_ensign.py b/tests/pyensign/test_ensign.py index f748f53..59eff23 100644 --- a/tests/pyensign/test_ensign.py +++ b/tests/pyensign/test_ensign.py @@ -9,8 +9,8 @@ from pyensign.events import Event from pyensign.connection import Cursor -from pyensign.utils.topics import Topic -from pyensign.api.v1beta1 import ensign_pb2, topic_pb2, query_pb2 +from pyensign.topics import Topic +from pyensign.api.v1beta1 import ensign_pb2, topic_pb2, query_pb2, event_pb2 from pyensign.ensign import Ensign, authenticate, publisher, subscriber from pyensign.enum import TopicState from pyensign.exceptions import ( @@ -827,33 +827,159 @@ async def test_topic_exists(self, mock_exists, ensign): @pytest.mark.asyncio @patch("pyensign.connection.Client.info") async def test_info(self, mock_info, ensign): + project_id = ULID() expected = ensign_pb2.ProjectInfo( - project_id=ULID().bytes, - num_topics=3, + project_id=project_id.bytes, + num_topics=2, num_readonly_topics=1, events=100, + duplicates=10, + data_size_bytes=1024, + topics=[ + topic_pb2.TopicInfo( + topic_id=ULID().bytes, + project_id=project_id.bytes, + event_offset_id=ULID().bytes, + events=60, + duplicates=6, + data_size_bytes=512, + types=[ + topic_pb2.EventTypeInfo( + type=event_pb2.Type( + name="message", + major_version=1, + minor_version=2, + patch_version=3, + ), + mimetype=MIME.TEXT_PLAIN, + events=60, + duplicates=6, + data_size_bytes=512, + ), + ], + ), + topic_pb2.TopicInfo( + topic_id=ULID().bytes, + project_id=project_id.bytes, + event_offset_id=ULID().bytes, + events=40, + duplicates=4, + data_size_bytes=512, + types=[ + topic_pb2.EventTypeInfo( + type=event_pb2.Type( + name="data", + major_version=1, + patch_version=2, + ), + mimetype=MIME.APPLICATION_JSON, + events=30, + duplicates=3, + data_size_bytes=256, + ), + topic_pb2.EventTypeInfo( + type=event_pb2.Type( + name="model", + major_version=2, + ), + mimetype=MIME.APPLICATION_PYTHON_PICKLE, + events=10, + duplicates=1, + data_size_bytes=256, + ), + ], + ), + ], ) mock_info.return_value = expected - actual = await ensign.info() - assert actual == expected + info = await ensign.info() + assert info.id == project_id + assert info.num_topics > 0 + assert info.num_readonly_topics > 0 + assert info.events > 0 + assert info.duplicates > 0 + assert info.data_size_bytes > 0 + assert len(info.topics) > 0 + for topic in info.topics: + assert isinstance(topic.id, ULID) + assert isinstance(topic.project_id, ULID) + assert isinstance(topic.event_offset_id, bytes) + assert topic.events > 0 + assert topic.duplicates > 0 + assert topic.data_size_bytes > 0 + assert len(topic.types) > 0 + for type in topic.types: + assert len(type.type.semver()) > 0 + assert type.mimetype > 0 + assert type.events > 0 + assert type.duplicates > 0 + assert type.data_size_bytes > 0 @pytest.mark.asyncio @patch("pyensign.connection.Client.info") async def test_info_filter(self, mock_info, ensign): + project_id = ULID() expected = ensign_pb2.ProjectInfo( - project_id=ULID().bytes, - num_topics=3, + project_id=project_id.bytes, + num_topics=1, num_readonly_topics=1, events=100, + duplicates=10, + data_size_bytes=1024, + topics=[ + topic_pb2.TopicInfo( + topic_id=ULID().bytes, + project_id=project_id.bytes, + event_offset_id=ULID().bytes, + events=60, + duplicates=6, + data_size_bytes=512, + types=[ + topic_pb2.EventTypeInfo( + type=event_pb2.Type( + name="message", + major_version=1, + minor_version=2, + patch_version=3, + ), + mimetype=MIME.TEXT_PLAIN, + events=60, + duplicates=6, + data_size_bytes=512, + ), + ], + ), + ], ) mock_info.return_value = expected - topic_ids = [str(ULID()), str(ULID())] - actual = await ensign.info(topic_ids=topic_ids) + topic_ids = [str(ULID())] + info = await ensign.info(topic_ids=topic_ids) # Ensure that ID bytes, not ULIDs, are passed to the client id_bytes = [ULID.from_str(id).bytes for id in topic_ids] mock_info.assert_called_once_with(id_bytes) - assert actual == expected + + assert info.id == project_id + assert info.num_topics > 0 + assert info.num_readonly_topics > 0 + assert info.events > 0 + assert info.duplicates > 0 + assert info.data_size_bytes > 0 + assert len(info.topics) > 0 + for topic in info.topics: + assert isinstance(topic.id, ULID) + assert isinstance(topic.project_id, ULID) + assert isinstance(topic.event_offset_id, bytes) + assert topic.events > 0 + assert topic.duplicates > 0 + assert topic.data_size_bytes > 0 + assert len(topic.types) > 0 + for type in topic.types: + assert len(type.type.semver()) > 0 + assert type.mimetype > 0 + assert type.events > 0 + assert type.duplicates > 0 + assert type.data_size_bytes > 0 @pytest.mark.asyncio @pytest.mark.parametrize( @@ -889,7 +1015,7 @@ async def test_topic_exists_no_cache(self, mock_exists, ensign_no_cache): {"strategy": topic_pb2.Deduplication.DATAGRAM, "offset": 2}, {"strategy": topic_pb2.Deduplication.STRICT, "offset": "latest"}, {"strategy": "unique_field", "fields": ["foo", "bar"]}, - {"strategy": "Strict", "offset": topic_pb2.Deduplication.OFFSET_EARLIEST} + {"strategy": "Strict", "offset": topic_pb2.Deduplication.OFFSET_EARLIEST}, ], ) async def test_set_topic_deduplication_policy(self, mock_set_policy, args, ensign): @@ -902,7 +1028,9 @@ async def test_set_topic_deduplication_policy(self, mock_set_policy, args, ensig @pytest.mark.parametrize( "strategy", [ - 2, "CONSISTENT_KEY_HASH", topic_pb2.ShardingStrategy.RANDOM, + 2, + "CONSISTENT_KEY_HASH", + topic_pb2.ShardingStrategy.RANDOM, ], ) async def test_set_topic_sharding_strategy(self, mock_set_policy, strategy, ensign): diff --git a/tests/pyensign/test_stream.py b/tests/pyensign/test_stream.py index 942da2b..d5fb191 100644 --- a/tests/pyensign/test_stream.py +++ b/tests/pyensign/test_stream.py @@ -6,14 +6,13 @@ from datetime import timedelta from unittest.mock import Mock +from pyensign.topics import Topic from pyensign.api.v1beta1.event import wrap, unwrap from pyensign.api.v1beta1 import ensign_pb2 from pyensign.stream import StreamHandler, Publisher, Subscriber -from pyensign.iterator import ResponseIterator from pyensign.utils.queue import BidiQueue from pyensign.exceptions import EnsignTimeoutError, EnsignTypeError from pyensign.events import Event, EventState, from_proto -from pyensign.utils.topics import Topic class TestStreamHandler: