Skip to content

Commit

Permalink
Parse topic from protobuf (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdeziel authored Nov 10, 2023
1 parent 66c82d5 commit 721b5c7
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 7 deletions.
8 changes: 5 additions & 3 deletions pyensign/ensign.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,12 @@ async def explain_query(self, query, params, include_duplicates=False):

async def get_topics(self) -> List[Any]:
"""
Get all topics.
Get a list of topics in the project. To get a greater level of detail with
data usage and event counts, use the `info()` method instead.
Yields
------
api.v1beta1.topic_pb2.Topic
List[Topic]
The topics.
"""

Expand All @@ -377,7 +378,8 @@ async def get_topics(self) -> List[Any]:
token = ""
while page is None or token != "":
page, token = await self.client.list_topics(next_page_token=token)
topics.extend(page)
for topic in page:
topics.append(Topic.from_proto(topic))
return topics

async def create_topic(self, topic_name: str) -> str:
Expand Down
84 changes: 84 additions & 0 deletions pyensign/topics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pyensign.utils import ulid
from pyensign.events import Type
from pyensign.utils import pbtime
from pyensign.enum import TopicState, DeduplicationStrategy, OffsetPosition


class Topic(object):
Expand All @@ -20,7 +22,12 @@ def __init__(self, id=None, name="", topic_str=""):
self.events = 0
self.duplicates = 0
self.data_size_bytes = 0
self.offset = 0
self.status = None
self.deduplication = None
self.types = []
self.created = None
self.modified = None

def __hash__(self):
if self.id is None:
Expand All @@ -47,6 +54,8 @@ def __str__(self):
s += "\n\tid={}".format(self.id)
if self.name:
s += "\n\tname={}".format(self.name)
if self.status:
s += "\n\tstatus={}".format(self.status)
if self.project_id:
s += "\n\tproject_id={}".format(self.project_id)
if self.event_offset_id:
Expand All @@ -56,12 +65,32 @@ def __str__(self):
s += "\n\tevents={}".format(self.events)
s += "\n\tduplicates={}".format(self.duplicates)
s += "\n\tdata_size_bytes={}".format(self.data_size_bytes)
s += "\n\toffset={}".format(self.offset)
s += "\n\ttypes:{}".format(len(self.types))
for type in self.types:
s += "\n\t"
s += str(type).replace("\t", "\t\t")
if self.created:
s += "\n\tcreated={}".format(self.created)
if self.modified:
s += "\n\tmodified={}".format(self.modified)
return s

@classmethod
def from_proto(cls, pb_val):
"""
Convert a protocol buffer Topic into a Topic.
"""

topic = cls(id=pb_val.id, name=pb_val.name)
topic.project_id = ulid.parse(pb_val.project_id)
topic.offset = pb_val.offset
topic.status = TopicState.convert(pb_val.status)
topic.deduplication = Deduplication.convert(pb_val.deduplication)
topic.created = pbtime.to_datetime(pb_val.created)
topic.modified = pbtime.to_datetime(pb_val.modified)
return topic

@classmethod
def from_info(cls, pb_val):
"""
Expand All @@ -79,6 +108,61 @@ def from_info(cls, pb_val):
return topic


class Deduplication(object):
"""
Deduplication stores information about how a topic is deduplicated, including the
configured deduplication strategy and offset position.
"""

def __init__(
self,
strategy=DeduplicationStrategy.NONE,
offset=OffsetPosition.OFFSET_EARLIEST,
keys=None,
fields=None,
overwrite_duplicate=False,
):
self.strategy = strategy
self.offset = offset
self.keys = keys
self.fields = fields
self.overwrite_duplicate = overwrite_duplicate

def __repr__(self):
repr = "Deduplication("
repr += "strategy={}, ".format(self.strategy)
repr += "offset={},".format(self.offset)
repr += "keys={}, ".format(self.keys)
repr += "fields={}, ".format(self.fields)
repr += "overwrite_duplicate={}".format(self.overwrite_duplicate)
repr += ")"
return repr

def __str__(self):
s = "Deduplication"
s += "\n\tstrategy={}".format(self.strategy)
s += "\n\toffset={}".format(self.offset)
s += "\n\tkeys={}".format(self.keys)
s += "\n\tfields={}".format(self.fields)
s += "\n\toverwrite_duplicate={}".format(self.overwrite_duplicate)
return s

@classmethod
def convert(cls, pb_val):
"""
Convert a protocol buffer Deduplication into a Deduplication.
"""

dedup = cls(
strategy=DeduplicationStrategy.convert(pb_val.strategy),
offset=OffsetPosition.convert(pb_val.offset),
keys=pb_val.keys,
fields=pb_val.fields,
overwrite_duplicate=pb_val.overwrite_duplicate,
)
return dedup


class EventType(object):
"""
An EventType represents a type of event that was published to a topic, which
Expand Down
41 changes: 37 additions & 4 deletions tests/pyensign/test_ensign.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import os
import json
import time
import pickle
import pytest
import asyncio
from datetime import datetime

from ulid import ULID
from unittest import mock
from asyncmock import patch
from google.protobuf.timestamp_pb2 import Timestamp

from pyensign.events import Event
from pyensign.connection import Cursor
from pyensign.topics import Topic
from pyensign.api.v1beta1 import ensign_pb2, topic_pb2, query_pb2, event_pb2
from pyensign.ensign import Ensign, authenticate, publisher, subscriber
from pyensign.enum import TopicState
from pyensign.enum import TopicState, DeduplicationStrategy, OffsetPosition
from pyensign.exceptions import (
EnsignTopicCreateError,
EnsignTopicDestroyError,
Expand Down Expand Up @@ -849,10 +853,39 @@ async def test_query_params(self, mock_en_sql, params, exception, ensign):
@pytest.mark.asyncio
@patch("pyensign.connection.Client.list_topics")
async def test_get_topics(self, mock_list, ensign):
topics = [topic_pb2.Topic(id=ULID().bytes, name="otters")]
topic_id = ULID()
project_id = ULID()
topics = [
topic_pb2.Topic(
id=topic_id.bytes,
project_id=project_id.bytes,
name="otters",
offset=42,
status=topic_pb2.TopicState.READY,
deduplication=topic_pb2.Deduplication(
strategy=topic_pb2.Deduplication.Strategy.KEY_GROUPED,
offset=topic_pb2.Deduplication.OffsetPosition.OFFSET_EARLIEST,
keys=["foo"],
overwrite_duplicate=True,
),
created=Timestamp(seconds=int(time.time())),
modified=Timestamp(seconds=int(time.time())),
)
]
mock_list.return_value = (topics, "")
recv = await ensign.get_topics()
assert recv[0].name == topics[0].name
actual = await ensign.get_topics()
assert len(actual) == 1
assert actual[0].id == topic_id
assert actual[0].project_id == project_id
assert actual[0].name == "otters"
assert actual[0].offset == 42
assert actual[0].status == TopicState.READY
assert actual[0].deduplication.strategy == DeduplicationStrategy.KEY_GROUPED
assert actual[0].deduplication.offset == OffsetPosition.OFFSET_EARLIEST
assert actual[0].deduplication.keys == ["foo"]
assert actual[0].deduplication.overwrite_duplicate
assert isinstance(actual[0].created, datetime)
assert isinstance(actual[0].modified, datetime)

# TODO: test pagination

Expand Down

0 comments on commit 721b5c7

Please sign in to comment.