From d2299090167ad5f5eb74062b12707fc6b5c0bf4e Mon Sep 17 00:00:00 2001
From: Elia Migliore <elia.migliore@aiven.io>
Date: Wed, 3 Jul 2024 11:53:12 +0200
Subject: [PATCH] fix: get rid of the path for fully qualified names.

This is not complete normalization feature but rather more a fix for the fully qualified paths vs simple name references. This fix not manage the various way a type can be expressed with a partial path.

Long explanation below:

If we accept the specifications from buf as correct, we can look at how a type reference is defined [here](https://protobuf.com/docs/language-spec#type-references).

The main problem with the previous implementation is that the current parser can't tell if a fully-qualified type reference in dot notation is the same as one identified by name alone aka simple name notation ([fully qualified reference](https://protobuf.com/docs/language-spec#fully-qualified-references)).

The fix do not consider all the different ways users can define a relative reference, schemas
 with different way of expressing a [relative reference](https://protobuf.com/docs/language-spec#relative-references) even if normalized,
 for now will keep being considered different.

Right now, our logic removes the `.package_name` (+ the Message scope) part
 before comparing field modifications (in fact it re-writes the schema using the simple name notation).

Even though the TypeTree (trie data structure) could help resolve [relative references](https://protobuf.com/docs/language-spec#relative-references), we don't want to add this feature in the python implementation now because it might cause new bugs due to the non-trivial behaviour of the protoc compiler.

We plan to properly normalize the protobuf schemas later.

We'll use protobuf descriptors (after the compilation and linking step) to gather type references already resolved, and we will threaten all the protobuf using always the fully qualified names.
Properly handling all path variations means reimplementing the protoc compiler behavior, we prefer relying on the already processed proto descriptor.

So, for now, we'll only implement a normalization for the fully-qualified references in dot notation and by simple name alone.

**NB:** This is not changing the semantics of the message since the local scope its always the one
 with max priority, so if you get rid of the fully-qualified reference protoc will resolve
 the reference with the one specified in the package scope.
---
 karapace/protobuf/proto_normalizations.py     |  96 +++++---
 karapace/protobuf/schema.py                   |  87 +------
 karapace/protobuf/type_tree.py                | 121 ++++++++++
 karapace/schema_models.py                     |   2 +-
 karapace/utils.py                             |  13 ++
 .../protobuf/test_protobuf_normalization.py   | 215 +++++++++++++++++-
 tests/unit/test_utils.py                      |  30 +++
 7 files changed, 441 insertions(+), 123 deletions(-)
 create mode 100644 karapace/protobuf/type_tree.py
 create mode 100644 tests/unit/test_utils.py

diff --git a/karapace/protobuf/proto_normalizations.py b/karapace/protobuf/proto_normalizations.py
index 9f65a2623..6a5356103 100644
--- a/karapace/protobuf/proto_normalizations.py
+++ b/karapace/protobuf/proto_normalizations.py
@@ -5,12 +5,12 @@
 
 from __future__ import annotations
 
-from karapace.dependency import Dependency
 from karapace.protobuf.enum_constant_element import EnumConstantElement
 from karapace.protobuf.enum_element import EnumElement
 from karapace.protobuf.extend_element import ExtendElement
 from karapace.protobuf.field_element import FieldElement
 from karapace.protobuf.group_element import GroupElement
+from karapace.protobuf.known_dependency import KnownDependency
 from karapace.protobuf.message_element import MessageElement
 from karapace.protobuf.one_of_element import OneOfElement
 from karapace.protobuf.option_element import OptionElement
@@ -19,8 +19,9 @@
 from karapace.protobuf.schema import ProtobufSchema
 from karapace.protobuf.service_element import ServiceElement
 from karapace.protobuf.type_element import TypeElement
-from karapace.schema_references import Reference
-from typing import Mapping, Sequence
+from karapace.protobuf.type_tree import TypeTree
+from karapace.utils import remove_prefix
+from typing import Sequence
 
 import abc
 
@@ -77,15 +78,9 @@ class NormalizedGroupElement(GroupElement):
 class NormalizedProtobufSchema(ProtobufSchema):
     proto_file_element: NormalizedProtoFileElement
 
-    def __init__(
-        self,
-        schema: str,
-        references: Sequence[Reference] | None = None,
-        dependencies: Mapping[str, Dependency] | None = None,
-        proto_file_element: ProtoFileElement | None = None,
-    ) -> None:
-        super().__init__(schema, references, dependencies, proto_file_element)
-        self.proto_file_element = normalize(self.proto_file_element)
+    @staticmethod
+    def from_protobuf_schema(protobuf_schema: ProtobufSchema) -> NormalizedProtobufSchema:
+        return normalize(protobuf_schema)
 
 
 class NormalizedOneOfElement(OneOfElement):
@@ -93,12 +88,29 @@ class NormalizedOneOfElement(OneOfElement):
     groups: Sequence[NormalizedGroupElement]
 
 
-def type_field_element_with_sorted_options(type_field: FieldElement) -> NormalizedFieldElement:
+def normalize_type_field_element(type_field: FieldElement, package: str, type_tree: TypeTree) -> NormalizedFieldElement:
     sorted_options = None if type_field.options is None else list(sorted(type_field.options, key=sort_by_name))
+    field_type_normalized = remove_prefix(remove_prefix(type_field.element_type, "."), f"{package}.")
+    reference_in_type_tree = type_tree.type_in_tree(field_type_normalized)
+    google_included_type = (
+        field_type_normalized in KnownDependency.index_simple or field_type_normalized in KnownDependency.index
+    )
+    element_type = (
+        field_type_normalized
+        # we can remove the package from the type only if there aren't clashing names in the same
+        # definition with the same relative prefix.
+        if (
+            reference_in_type_tree is not None
+            and reference_in_type_tree.is_uniquely_identifiable_type
+            and not google_included_type
+        )
+        or google_included_type
+        else type_field.element_type
+    )
     return NormalizedFieldElement(
         location=type_field.location,
         label=type_field.label,
-        element_type=type_field.element_type,
+        element_type=element_type,
         name=type_field.name,
         default_value=type_field.default_value,
         json_name=type_field.json_name,
@@ -135,9 +147,9 @@ def enum_element_with_sorted_options(enum_element: EnumElement) -> NormalizedEnu
     )
 
 
-def groups_with_sorted_options(group: GroupElement) -> NormalizedGroupElement:
+def groups_with_sorted_options(group: GroupElement, package: str, type_tree: TypeTree) -> NormalizedGroupElement:
     sorted_fields = (
-        None if group.fields is None else [type_field_element_with_sorted_options(field) for field in group.fields]
+        None if group.fields is None else [normalize_type_field_element(field, package, type_tree) for field in group.fields]
     )
     return NormalizedGroupElement(
         label=group.label,
@@ -149,10 +161,10 @@ def groups_with_sorted_options(group: GroupElement) -> NormalizedGroupElement:
     )
 
 
-def one_ofs_with_sorted_options(one_ofs: OneOfElement) -> NormalizedOneOfElement:
+def one_ofs_with_sorted_options(one_ofs: OneOfElement, package: str, type_tree: TypeTree) -> NormalizedOneOfElement:
     sorted_options = None if one_ofs.options is None else list(sorted(one_ofs.options, key=sort_by_name))
-    sorted_fields = [type_field_element_with_sorted_options(field) for field in one_ofs.fields]
-    sorted_groups = [groups_with_sorted_options(group) for group in one_ofs.groups]
+    sorted_fields = [normalize_type_field_element(field, package, type_tree) for field in one_ofs.fields]
+    sorted_groups = [groups_with_sorted_options(group, package, type_tree) for group in one_ofs.groups]
 
     return NormalizedOneOfElement(
         name=one_ofs.name,
@@ -163,11 +175,15 @@ def one_ofs_with_sorted_options(one_ofs: OneOfElement) -> NormalizedOneOfElement
     )
 
 
-def message_element_with_sorted_options(message_element: MessageElement) -> NormalizedMessageElement:
+def message_element_with_sorted_options(
+    message_element: MessageElement, package: str, type_tree: TypeTree
+) -> NormalizedMessageElement:
     sorted_options = None if message_element.options is None else list(sorted(message_element.options, key=sort_by_name))
-    sorted_nested_types = [type_element_with_sorted_options(nested_type) for nested_type in message_element.nested_types]
-    sorted_fields = [type_field_element_with_sorted_options(field) for field in message_element.fields]
-    sorted_one_ofs = [one_ofs_with_sorted_options(one_of) for one_of in message_element.one_ofs]
+    sorted_nested_types = [
+        type_element_with_sorted_options(nested_type, package, type_tree) for nested_type in message_element.nested_types
+    ]
+    sorted_fields = [normalize_type_field_element(field, package, type_tree) for field in message_element.fields]
+    sorted_one_ofs = [one_ofs_with_sorted_options(one_of, package, type_tree) for one_of in message_element.one_ofs]
 
     return NormalizedMessageElement(
         location=message_element.location,
@@ -183,16 +199,16 @@ def message_element_with_sorted_options(message_element: MessageElement) -> Norm
     )
 
 
-def type_element_with_sorted_options(type_element: TypeElement) -> NormalizedTypeElement:
+def type_element_with_sorted_options(type_element: TypeElement, package: str, type_tree: TypeTree) -> NormalizedTypeElement:
     sorted_nested_types: list[TypeElement] = []
 
     for nested_type in type_element.nested_types:
         if isinstance(nested_type, EnumElement):
             sorted_nested_types.append(enum_element_with_sorted_options(nested_type))
         elif isinstance(nested_type, MessageElement):
-            sorted_nested_types.append(message_element_with_sorted_options(nested_type))
+            sorted_nested_types.append(message_element_with_sorted_options(nested_type, package, type_tree))
         else:
-            raise ValueError("Unknown type element")  # tried with assert_never but it did not work
+            raise ValueError(f"Unknown type element {type(nested_type)}")  # tried with assert_never but it did not work
 
     # doing it here since the subtypes do not declare the nested_types property
     type_element.nested_types = sorted_nested_types
@@ -201,16 +217,18 @@ def type_element_with_sorted_options(type_element: TypeElement) -> NormalizedTyp
         return enum_element_with_sorted_options(type_element)
 
     if isinstance(type_element, MessageElement):
-        return message_element_with_sorted_options(type_element)
+        return message_element_with_sorted_options(type_element, package, type_tree)
 
-    raise ValueError("Unknown type element")  # tried with assert_never but it did not work
+    raise ValueError(f"Unknown type element of type {type(type_element)}")  # tried with assert_never but it did not work
 
 
-def extends_element_with_sorted_options(extend_element: ExtendElement) -> NormalizedExtendElement:
+def extends_element_with_sorted_options(
+    extend_element: ExtendElement, package: str, type_tree: TypeTree
+) -> NormalizedExtendElement:
     sorted_fields = (
         None
         if extend_element.fields is None
-        else [type_field_element_with_sorted_options(field) for field in extend_element.fields]
+        else [normalize_type_field_element(field, package, type_tree) for field in extend_element.fields]
     )
     return NormalizedExtendElement(
         location=extend_element.location,
@@ -249,19 +267,22 @@ def service_element_with_sorted_options(service_element: ServiceElement) -> Norm
     )
 
 
-def normalize(proto_file_element: ProtoFileElement) -> NormalizedProtoFileElement:
+def normalize(protobuf_schema: ProtobufSchema) -> NormalizedProtobufSchema:
+    proto_file_element = protobuf_schema.proto_file_element
+    type_tree = protobuf_schema.types_tree()
+    package = proto_file_element.package_name or ""
     sorted_types: Sequence[NormalizedTypeElement] = [
-        type_element_with_sorted_options(type_element) for type_element in proto_file_element.types
+        type_element_with_sorted_options(type_element, package, type_tree) for type_element in proto_file_element.types
     ]
     sorted_options = list(sorted(proto_file_element.options, key=sort_by_name))
     sorted_services: Sequence[NormalizedServiceElement] = [
         service_element_with_sorted_options(service) for service in proto_file_element.services
     ]
     sorted_extend_declarations: Sequence[NormalizedExtendElement] = [
-        extends_element_with_sorted_options(extend) for extend in proto_file_element.extend_declarations
+        extends_element_with_sorted_options(extend, package, type_tree) for extend in proto_file_element.extend_declarations
     ]
 
-    return NormalizedProtoFileElement(
+    normalized_protobuf_element = NormalizedProtoFileElement(
         location=proto_file_element.location,
         package_name=proto_file_element.package_name,
         syntax=proto_file_element.syntax,
@@ -272,3 +293,10 @@ def normalize(proto_file_element: ProtoFileElement) -> NormalizedProtoFileElemen
         extend_declarations=sorted_extend_declarations,
         options=sorted_options,
     )
+
+    return NormalizedProtobufSchema(
+        protobuf_schema.schema,
+        protobuf_schema.references,
+        protobuf_schema.dependencies,
+        normalized_protobuf_element,
+    )
diff --git a/karapace/protobuf/schema.py b/karapace/protobuf/schema.py
index 9c407bdcd..fdd72a891 100644
--- a/karapace/protobuf/schema.py
+++ b/karapace/protobuf/schema.py
@@ -23,12 +23,12 @@
 from karapace.protobuf.proto_parser import ProtoParser
 from karapace.protobuf.serialization import deserialize, serialize
 from karapace.protobuf.type_element import TypeElement
+from karapace.protobuf.type_tree import SourceFileReference, TypeTree
 from karapace.protobuf.utils import append_documentation, append_indented
 from karapace.schema_references import Reference
-from typing import Iterable, Mapping, Sequence
+from typing import Mapping, Sequence
 
 import binascii
-import itertools
 
 
 def add_slashes(text: str) -> str:
@@ -122,88 +122,6 @@ class UsedType:
     used_attribute_type: str
 
 
-@default_dataclass
-class SourceFileReference:
-    reference: str
-    import_order: int
-
-
-@default_dataclass
-class TypeTree:
-    token: str
-    children: list[TypeTree]
-    source_reference: SourceFileReference | None
-
-    def source_reference_tree_recursive(self) -> Iterable[SourceFileReference | None]:
-        sources = [] if self.source_reference is None else [self.source_reference]
-        for child in self.children:
-            sources = itertools.chain(sources, child.source_reference_tree())
-        return sources
-
-    def source_reference_tree(self) -> Iterable[SourceFileReference]:
-        return filter(lambda x: x is not None, self.source_reference_tree_recursive())
-
-    def inserted_elements(self) -> int:
-        """
-        Returns the newest element generation accessible from that node.
-        Where with the term generation we mean the order for which a message
-        has been imported.
-        If called on the root of the tree it corresponds with the number of
-        fully specified path objects present in the tree.
-        """
-        return max(reference.import_order for reference in self.source_reference_tree())
-
-    def oldest_matching_import(self) -> int:
-        """
-        Returns the oldest element generation accessible from that node.
-        Where with the term generation we mean the order for which a message
-        has been imported.
-        """
-        return min(reference.import_order for reference in self.source_reference_tree())
-
-    def expand_missing_absolute_path_recursive(self, oldest_import: int) -> Sequence[str]:
-        """
-        Method that, once called on a node, traverse all the leaf and
-        return the oldest imported element with the common postfix.
-        This is also the current behaviour
-        of protobuf while dealing with a not fully specified path, it seeks for
-        the firstly imported message with a matching path.
-        """
-        if self.source_reference is not None:
-            if self.source_reference.import_order == oldest_import:
-                return [self.token]
-            return []
-
-        for child in self.children:
-            maybe_oldest_child = child.expand_missing_absolute_path_recursive(oldest_import)
-            if maybe_oldest_child is not None:
-                return list(maybe_oldest_child) + [self.token]
-
-        return []
-
-    def expand_missing_absolute_path(self) -> Sequence[str]:
-        oldest_import = self.oldest_matching_import()
-        expanded_missing_path = self.expand_missing_absolute_path_recursive(oldest_import)
-        assert (
-            expanded_missing_path is not None
-        ), "each node should have, by construction, at least a leaf that is a fully specified path"
-        return expanded_missing_path[:-1]  # skipping myself since I was matched
-
-    @property
-    def is_fully_qualified_type(self) -> bool:
-        return len(self.children) == 0
-
-    def represent(self, level=0) -> str:
-        spacing = " " * 3 * level
-        if self.is_fully_qualified_type:
-            return f"{spacing}>{self.token}"
-        child_repr = "\n".join(child.represent(level=level + 1) for child in self.children)
-        return f"{spacing}{self.token} ->\n{child_repr}"
-
-    def __repr__(self) -> str:
-        return self.represent()
-
-
 def _add_new_type_recursive(
     parent_tree: TypeTree,
     remaining_tokens: list[str],
@@ -258,6 +176,7 @@ def __init__(
     ) -> None:
         if type(schema).__name__ != "str":
             raise IllegalArgumentException("Non str type of schema string")
+        self.schema = schema
         self.cache_string = ""
 
         if proto_file_element is not None:
diff --git a/karapace/protobuf/type_tree.py b/karapace/protobuf/type_tree.py
new file mode 100644
index 000000000..f9279e864
--- /dev/null
+++ b/karapace/protobuf/type_tree.py
@@ -0,0 +1,121 @@
+"""
+Copyright (c) 2024 Aiven Ltd
+See LICENSE for details
+"""
+from __future__ import annotations
+
+from collections.abc import Iterable, Sequence
+from karapace.dataclasses import default_dataclass
+from karapace.utils import remove_prefix
+
+import itertools
+
+
+@default_dataclass
+class SourceFileReference:
+    reference: str
+    import_order: int
+
+
+@default_dataclass
+class TypeTree:
+    token: str
+    children: list[TypeTree]
+    source_reference: SourceFileReference | None
+
+    def source_reference_tree_recursive(self) -> list[SourceFileReference]:
+        return list(
+            itertools.chain(
+                [] if self.source_reference is None else [self.source_reference],
+                *[child.source_reference_tree() for child in self.children],
+            )
+        )
+
+    def source_reference_tree(self) -> Iterable[SourceFileReference]:
+        return filter(lambda x: x is not None, self.source_reference_tree_recursive())
+
+    def inserted_elements(self) -> int:
+        """
+        Returns the newest element generation accessible from that node.
+        Where with the term generation we mean the order for which a message
+        has been imported.
+        If called on the root of the tree it corresponds with the number of
+        fully specified path objects present in the tree.
+        """
+        return max(reference.import_order for reference in self.source_reference_tree())
+
+    def oldest_matching_import(self) -> int:
+        """
+        Returns the oldest element generation accessible from that node.
+        Where with the term generation we mean the order for which a message
+        has been imported.
+        """
+        return min(reference.import_order for reference in self.source_reference_tree())
+
+    def expand_missing_absolute_path_recursive(self, oldest_import: int) -> Sequence[str]:
+        """
+        Method that, once called on a node, traverse all the leaf and
+        return the oldest imported element with the common postfix.
+        This is also the current behaviour
+        of protobuf while dealing with a not fully specified path, it seeks for
+        the firstly imported message with a matching path.
+        """
+        if self.source_reference is not None:
+            if self.source_reference.import_order == oldest_import:
+                return [self.token]
+            return []
+
+        for child in self.children:
+            maybe_oldest_child = child.expand_missing_absolute_path_recursive(oldest_import)
+            if maybe_oldest_child is not None:
+                return list(maybe_oldest_child) + [self.token]
+
+        return []
+
+    @staticmethod
+    def _type_in_tree(tree: TypeTree, remaining_tokens: list[str]) -> TypeTree | None:
+        if remaining_tokens:
+            to_seek = remaining_tokens.pop()
+
+            for child in tree.children:
+                if child.token == to_seek:
+                    return TypeTree._type_in_tree(child, remaining_tokens)
+            return None
+        return tree
+
+    def type_in_tree(self, queried_type: str) -> TypeTree | None:
+        return TypeTree._type_in_tree(self, remove_prefix(queried_type, ".").split("."))
+
+    def expand_missing_absolute_path(self) -> Sequence[str]:
+        oldest_import = self.oldest_matching_import()
+        expanded_missing_path = self.expand_missing_absolute_path_recursive(oldest_import)
+        assert (
+            expanded_missing_path is not None
+        ), "each node should have, by construction, at least a leaf that is a fully specified path"
+        return expanded_missing_path[:-1]  # skipping myself since I was matched
+
+    @property
+    def is_fully_qualified_type(self) -> bool:
+        return len(self.children) == 0
+
+    @property
+    def types_with_same_scope(self) -> int:
+        # if we have the source set means that the type its defined there by a source.
+        return sum(
+            [children.types_with_same_scope for children in self.children] + [1 if self.source_reference is not None else 0]
+        )
+
+    @property
+    def is_uniquely_identifiable_type(self) -> bool:
+        # to be uniquely identifying we should find exactly 1 source that provide the current definition.
+        return self.types_with_same_scope == 1
+
+    def represent(self, level: int = 0) -> str:
+        spacing = " " * 3 * level
+        if self.is_fully_qualified_type:
+            return f"{spacing}>{self.token}"
+        child_repr = "\n".join(child.represent(level=level + 1) for child in self.children)
+        return f"{spacing}{self.token} ->\n{child_repr}"
+
+    def __repr__(self) -> str:
+        return self.represent()
diff --git a/karapace/schema_models.py b/karapace/schema_models.py
index d21917025..896e73f26 100644
--- a/karapace/schema_models.py
+++ b/karapace/schema_models.py
@@ -74,7 +74,7 @@ def parse_protobuf_schema_definition(
     protobuf_schema = (
         ProtobufSchema(schema_definition, references, dependencies)
         if not normalize
-        else NormalizedProtobufSchema(schema_definition, references, dependencies)
+        else NormalizedProtobufSchema.from_protobuf_schema(ProtobufSchema(schema_definition, references, dependencies))
     )
     if validate_references:
         result = protobuf_schema.verify_schema_dependencies()
diff --git a/karapace/utils.py b/karapace/utils.py
index d566b0ef1..8066c5565 100644
--- a/karapace/utils.py
+++ b/karapace/utils.py
@@ -243,3 +243,16 @@ def log(
             self.logger.debug(self._log_format % tuple(values), extra=extra)
         except Exception:  # pylint: disable=broad-except
             self.logger.exception("Error in logging")
+
+
+def remove_prefix(string: str, prefix: str) -> str:
+    """
+    Not available in python 3.8.
+    """
+    i = 0
+    while i < len(string) and i < len(prefix):
+        if string[i] != prefix[i]:
+            return string
+        i += 1
+
+    return string[i:]
diff --git a/tests/unit/protobuf/test_protobuf_normalization.py b/tests/unit/protobuf/test_protobuf_normalization.py
index b772b293c..0f752d563 100644
--- a/tests/unit/protobuf/test_protobuf_normalization.py
+++ b/tests/unit/protobuf/test_protobuf_normalization.py
@@ -2,14 +2,19 @@
 Copyright (c) 2024 Aiven Ltd
 See LICENSE for details
 """
+
+from karapace.dependency import Dependency
 from karapace.protobuf.compare_result import CompareResult
 from karapace.protobuf.location import Location
 from karapace.protobuf.proto_normalizations import normalize
-from karapace.protobuf.proto_parser import ProtoParser
+from karapace.schema_models import parse_protobuf_schema_definition, ValidatedTypedSchema
+from karapace.schema_type import SchemaType
+from karapace.typing import Subject, Version
+from typing import Final
 
 import pytest
 
-location: Location = Location("some/folder", "file.proto")
+LOCATION: Final[Location] = Location("somefolder", "file.proto")
 
 
 # this would be a good case for using a property based test with a well-formed message generator
@@ -511,10 +516,212 @@
     ),
 )
 def test_differently_ordered_options_normalizes_equally(ordered_schema: str, unordered_schema: str) -> None:
-    ordered_proto = ProtoParser.parse(location, ordered_schema)
-    unordered_proto = ProtoParser.parse(location, unordered_schema)
+    ordered_proto = parse_protobuf_schema_definition(
+        schema_definition=ordered_schema,
+        references=None,
+        dependencies=None,
+        validate_references=True,
+        normalize=True,
+    )
+    unordered_proto = parse_protobuf_schema_definition(
+        schema_definition=unordered_schema,
+        references=None,
+        dependencies=None,
+        validate_references=True,
+        normalize=True,
+    )
 
     result = CompareResult()
     normalize(ordered_proto).compare(normalize(unordered_proto), result)
     assert result.is_compatible()
     assert normalize(ordered_proto).to_schema() == normalize(unordered_proto).to_schema()
+
+
+DEPENDENCY = """\
+syntax = "proto3";
+package my.awesome.customer.v1;
+
+message NestedValue {
+    string value = 1;
+}
+\
+"""
+
+PROTO_WITH_FULLY_QUALIFIED_PATHS = """\
+syntax = "proto3";
+package my.awesome.customer.v1;
+
+import "my/awesome/customer/v1/nested_value.proto";
+import "google/protobuf/timestamp.proto";
+
+option csharp_namespace = "my.awesome.customer.V1";
+option go_package = "github.com/customer/api/my/awesome/customer/v1;dspv1";
+option java_multiple_files = true;
+option java_outer_classname = "EventValueProto";
+option java_package = "com.my.awesome.customer.v1";
+option objc_class_prefix = "TDD";
+option php_metadata_namespace = "My\\Awesome\\Customer\\V1";
+option php_namespace = "My\\Awesome\\Customer\\V1";
+option ruby_package = "My::Awesome::Customer::V1";
+
+message EventValue {
+  .my.awesome.customer.v1.NestedValue nested_value = 1;
+  .google.protobuf.Timestamp created_at = 2;
+}
+"""
+
+
+PROTO_WITH_SIMPLE_NAMES = """\
+syntax = "proto3";
+package my.awesome.customer.v1;
+
+import "my/awesome/customer/v1/nested_value.proto";
+import "google/protobuf/timestamp.proto";
+
+option csharp_namespace = "my.awesome.customer.V1";
+option go_package = "github.com/customer/api/my/awesome/customer/v1;dspv1";
+option java_multiple_files = true;
+option java_outer_classname = "EventValueProto";
+option java_package = "com.my.awesome.customer.v1";
+option objc_class_prefix = "TDD";
+option php_metadata_namespace = "My\\Awesome\\Customer\\V1";
+option php_namespace = "My\\Awesome\\Customer\\V1";
+option ruby_package = "My::Awesome::Customer::V1";
+
+message EventValue {
+  NestedValue nested_value = 1;
+  google.protobuf.Timestamp created_at = 2;
+}"""
+
+
+def test_full_path_and_simple_names_are_equal() -> None:
+    """
+    This test aims to ensure that after the normalization process the schema expressed as SimpleNames will match the same
+    schemas expressed with fully qualified references.
+    This does not consider the normalization process for the different ways a type can be expressed as a relative reference.
+
+    Long explaination below:
+
+    If we accept the specifications from buf as correct, we can look at how a type
+    reference is defined here: https://protobuf.com/docs/language-spec#type-references.
+
+    The main problem with the previous implementation is that the current parser can't tell
+    if a fully-qualified type reference in dot notation is the same as one identified by name
+    alone aka simple name notation (https://protobuf.com/docs/language-spec#fully-qualified-references).
+
+    The fix do not consider all the different ways users can define a relative reference, schemas
+     with different way of expressing a relative reference even if normalized,
+     for now will keep being considered different (https://protobuf.com/docs/language-spec#relative-references).
+
+    Right now, our logic removes the `.package_name` (+ the Message scope) part
+     before comparing field modifications (in fact it re-writes the schema using the simple name notation).
+
+    Even though the TypeTree (trie data structure) could help resolve
+    relative references (https://protobuf.com/docs/language-spec#relative-references),
+    we don't want to add this feature in the python implementation now because it might cause new bugs due to the
+    non-trivial behaviour of the protoc compiler.
+
+    We plan to properly normalize the protobuf schemas later.
+
+    We'll use protobuf descriptors (after the compilation and linking step)
+    to gather type references already resolved, and we will threaten all the protobuf
+    using always the fully qualified names.
+    Properly handling all path variations means reimplementing the protoc compiler behavior,
+     we prefer relying on the already processed proto descriptor.
+
+    So, for now, we'll only implement a normalization for the fully-qualified references
+    in dot notation and by simple name alone.
+
+    This is not changing the semantics of the message since the local scope its always the one
+    with max priority, so if you get rid of the fully-qualified reference protoc will resolve
+    the reference with the one specified in the package scope.
+    """
+    no_ref_schema = ValidatedTypedSchema.parse(SchemaType.PROTOBUF, DEPENDENCY, normalize=True)
+    dep = Dependency("NestedValue.proto", Subject("nested_value"), Version(1), no_ref_schema)
+    dependencies = {"NestedValue.proto": dep}
+    fully_qualitifed_simple_name_notation = parse_protobuf_schema_definition(
+        schema_definition=PROTO_WITH_SIMPLE_NAMES,
+        references=None,
+        dependencies=dependencies,
+        validate_references=True,
+        normalize=False,
+    )
+    fully_qualitifed_dot_notation = parse_protobuf_schema_definition(
+        schema_definition=PROTO_WITH_FULLY_QUALIFIED_PATHS,
+        references=None,
+        dependencies=dependencies,
+        validate_references=True,
+        normalize=True,
+    )
+    result = CompareResult()
+    fully_qualitifed_simple_name_notation.compare(normalize(fully_qualitifed_dot_notation), result)
+    assert result.is_compatible(), "normalized itn't equal to simple name"
+    assert (
+        normalize(fully_qualitifed_dot_notation).to_schema() == fully_qualitifed_simple_name_notation.to_schema()
+    ), "normalization should transform it into an equivalent simple name"
+
+    fully_qualitifed_simple_name_notation.compare(normalize(fully_qualitifed_simple_name_notation), result)
+    assert result.is_compatible(), "normalization shouldn't change a simple name notation protofile"
+    assert (
+        fully_qualitifed_simple_name_notation.to_schema() == normalize(fully_qualitifed_dot_notation).to_schema()
+    ), "also the string rendering shouldn't change a simple name notation protofile"
+
+
+TRICKY_DEPENDENCY = """\
+syntax = "proto3";
+package org.my.awesome.customer.v1;
+
+message NestedValue {
+    string value = 1;
+}\
+"""
+
+PROTO_WITH_FULLY_QUALIFIED_PATHS_AND_TRICKY_DEPENDENCY = """\
+syntax = "proto3";
+
+package my.awesome.customer.v1;
+
+import "org/my/awesome/customer/v1/nested_value.proto";
+import "my/awesome/customer/v1/nested_value.proto";
+import "google/protobuf/timestamp.proto";
+
+option csharp_namespace = "my.awesome.customer.V1";
+option go_package = "github.com/customer/api/my/awesome/customer/v1;dspv1";
+option java_multiple_files = true;
+option java_outer_classname = "EventValueProto";
+option java_package = "com.my.awesome.customer.v1";
+option objc_class_prefix = "TDD";
+option php_metadata_namespace = "MyAwesomeCustomerV1";
+option php_namespace = "MyAwesomeCustomerV1";
+option ruby_package = "My::Awesome::Customer::V1";
+
+message EventValue {
+  .my.awesome.customer.v1.NestedValue nested_value = 1;
+
+  google.protobuf.Timestamp created_at = 2;
+}
+"""
+
+
+def test_full_path_and_simple_names_are_not_equal_if_simple_name_is_not_unique() -> None:
+    no_ref_schema = ValidatedTypedSchema.parse(SchemaType.PROTOBUF, DEPENDENCY, normalize=True)
+    tricky_no_ref_schema = ValidatedTypedSchema.parse(SchemaType.PROTOBUF, TRICKY_DEPENDENCY, normalize=True)
+    dep = Dependency("NestedValue.proto", Subject("nested_value"), Version(1), no_ref_schema)
+    tricky_dep = Dependency("TrickyNestedValue.proto", Subject("tricky_nested_value"), Version(1), tricky_no_ref_schema)
+    dependencies = {"NestedValue.proto": dep, "TrickyNestedValue.proto": tricky_dep}
+    schema = parse_protobuf_schema_definition(
+        schema_definition=PROTO_WITH_FULLY_QUALIFIED_PATHS_AND_TRICKY_DEPENDENCY,
+        references=None,
+        dependencies=dependencies,
+        validate_references=True,
+        normalize=False,
+    ).to_schema()
+    normalized_schema = parse_protobuf_schema_definition(
+        schema_definition=PROTO_WITH_FULLY_QUALIFIED_PATHS_AND_TRICKY_DEPENDENCY,
+        references=None,
+        dependencies=dependencies,
+        validate_references=True,
+        normalize=True,
+    ).to_schema()
+
+    assert normalized_schema == schema, "Since the simple name is not unique identifying the type isn't replacing the source"
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
new file mode 100644
index 000000000..ed5a980e1
--- /dev/null
+++ b/tests/unit/test_utils.py
@@ -0,0 +1,30 @@
+"""
+Copyright (c) 2024 Aiven Ltd
+See LICENSE for details
+"""
+from karapace.utils import remove_prefix
+
+
+def test_remove_prefix_basic() -> None:
+    result = remove_prefix("hello world", "hello ")
+    assert result == "world"
+
+
+def test_remove_prefix_empty_prefix() -> None:
+    result = remove_prefix("hello world", "")
+    assert result == "hello world"
+
+
+def test_remove_prefix_prefix_not_in_string() -> None:
+    result = remove_prefix("hello world", "hey ")
+    assert result == "hello world"
+
+
+def test_remove_prefix_multiple_occurrences_of_prefix() -> None:
+    result = remove_prefix("hello hello world", "hello ")
+    assert result == "hello world"
+
+
+def test_remove_prefix_empty_string() -> None:
+    result = remove_prefix("", "hello ")
+    assert result == ""