Skip to content

Commit

Permalink
Bounded Trie translation.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Nov 26, 2024
1 parent befa30c commit e41e73b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
38 changes: 38 additions & 0 deletions sdks/python/apache_beam/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from typing import Optional
from typing import Set

from apache_beam.portability.api import metrics_pb2

try:
import cython
except ImportError:
Expand Down Expand Up @@ -669,6 +671,29 @@ def __init__(self):
self._children: Optional[dict[str, '_BoundedTrieNode']] = {}
self._truncated = False

def to_proto(self) -> metrics_pb2.BoundedTrieNode:
return metrics_pb2.BoundedTrieNode(
truncated=self._truncated,
children={
name: child.to_proto()
for name, child in self._children.items()
} if self._children else None)

@staticmethod
def from_proto(proto: metrics_pb2.BoundedTrieNode) -> '_BoundedTrieNode':
node = _BoundedTrieNode()
if proto.truncated:
node._truncated = True
node._children = None
else:
node._children = {
name: _BoundedTrieNode.from_proto(child)
for name,
child in proto.children.items()
}
node._size = min(1, sum(child._size for child in node._children.values()))
return node

def size(self):
return self._size

Expand Down Expand Up @@ -764,6 +789,19 @@ def __init__(self, *, root=None, singleton=None, bound=_DEFAULT_BOUND):
self._root = root
self._bound = bound

def to_proto(self) -> metrics_pb2.BoundedTrie:
return metrics_pb2.BoundedTrie(
bound=self._bound,
singleton=self._singlton if self._singleton else None,
root=self._root.to_proto() if self._root else None)

@staticmethod
def from_proto(proto: metrics_pb2.BoundedTrie) -> 'BoundedTrieData':
return BoundedTrieData(
bound=proto.bound,
singleton=tuple(proto.singleton) if proto.singleton else None,
root=_BoundedTrieNode.from_proto(proto.root) if proto.root else None)

def as_trie(self):
if self._root is not None:
return self._root
Expand Down
24 changes: 23 additions & 1 deletion sdks/python/apache_beam/metrics/monitoring_infos.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@
common_urns.monitoring_info_specs.USER_DISTRIBUTION_INT64.spec.urn)
USER_GAUGE_URN = common_urns.monitoring_info_specs.USER_LATEST_INT64.spec.urn
USER_STRING_SET_URN = common_urns.monitoring_info_specs.USER_SET_STRING.spec.urn
USER_BOUNDED_TRIE_URN = (
common_urns.monitoring_info_specs.USER_BOUNDED_TRIE.spec.urn)
USER_METRIC_URNS = set([
USER_COUNTER_URN,
USER_DISTRIBUTION_URN,
USER_GAUGE_URN,
USER_STRING_SET_URN
USER_STRING_SET_URN,
USER_BOUNDED_TRIE_URN,
])
WORK_REMAINING_URN = common_urns.monitoring_info_specs.WORK_REMAINING.spec.urn
WORK_COMPLETED_URN = common_urns.monitoring_info_specs.WORK_COMPLETED.spec.urn
Expand All @@ -72,11 +75,13 @@
LATEST_INT64_TYPE = common_urns.monitoring_info_types.LATEST_INT64_TYPE.urn
PROGRESS_TYPE = common_urns.monitoring_info_types.PROGRESS_TYPE.urn
STRING_SET_TYPE = common_urns.monitoring_info_types.SET_STRING_TYPE.urn
BOUNDED_TRIE_TYPE = common_urns.monitoring_info_types.BOUNDED_TRIE_TYPE.urn

COUNTER_TYPES = set([SUM_INT64_TYPE])
DISTRIBUTION_TYPES = set([DISTRIBUTION_INT64_TYPE])
GAUGE_TYPES = set([LATEST_INT64_TYPE])
STRING_SET_TYPES = set([STRING_SET_TYPE])
BOUNDED_TRIE_TYPES = set([BOUNDED_TRIE_TYPE])

# TODO(migryz) extract values from beam_fn_api.proto::MonitoringInfoLabels
PCOLLECTION_LABEL = (
Expand Down Expand Up @@ -320,6 +325,23 @@ def user_set_string(namespace, name, metric, ptransform=None):
USER_STRING_SET_URN, STRING_SET_TYPE, metric, labels)


def user_bounded_trie(namespace, name, metric, ptransform=None):
"""Return the string set monitoring info for the URN, metric and labels.
Args:
namespace: User-defined namespace of BoundedTrie.
name: Name of BoundedTrie.
metric: The BoundedTrieData representing the metrics.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
return create_monitoring_info(
USER_BOUNDED_TRIE_URN,
BOUNDED_TRIE_TYPE,
metric.to_proto().SerializeToString(),
labels)


def create_monitoring_info(
urn, type_urn, payload, labels=None) -> metrics_pb2.MonitoringInfo:
"""Return the gauge monitoring info for the URN, type, metric and labels.
Expand Down

0 comments on commit e41e73b

Please sign in to comment.