From aadd71acbdd31913baccc7b7693cf27431dcb909 Mon Sep 17 00:00:00 2001 From: kbartlett Date: Thu, 21 Nov 2024 14:47:14 -0500 Subject: [PATCH] feat: connector for Neo4j --- metadata-ingestion/setup.py | 3 + .../tests/unit/test_neo4j_source.py | 325 +++++++++++------- 2 files changed, 197 insertions(+), 131 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 2469af74b0334..cb99d4955dd0b 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -524,6 +524,7 @@ "qlik-sense": sqlglot_lib | {"requests", "websocket-client"}, "sigma": sqlglot_lib | {"requests"}, "sac": sac, + "neo4j": {"pandas", "neo4j"}, } # This is mainly used to exclude plugins from the Docker image. @@ -668,6 +669,7 @@ "sigma", "sac", "cassandra", + "neo4j", ] if plugin for dependency in plugins[plugin] @@ -787,6 +789,7 @@ "sigma = datahub.ingestion.source.sigma.sigma:SigmaSource", "sac = datahub.ingestion.source.sac.sac:SACSource", "cassandra = datahub.ingestion.source.cassandra.cassandra:CassandraSource", + "neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource", ], "datahub.ingestion.transformer.plugins": [ "pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership", diff --git a/metadata-ingestion/tests/unit/test_neo4j_source.py b/metadata-ingestion/tests/unit/test_neo4j_source.py index 07f41a37aa36d..3e35417e93e07 100644 --- a/metadata-ingestion/tests/unit/test_neo4j_source.py +++ b/metadata-ingestion/tests/unit/test_neo4j_source.py @@ -1,155 +1,218 @@ import unittest +from pathlib import Path import pandas as pd +import pytest from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.neo4j.neo4j_source import Neo4jConfig, Neo4jSource -class TestNeo4j(unittest.TestCase): - def setUp(self): - self.neo = Neo4jSource(Neo4jConfig(), PipelineContext(run_id="test")) - self.record_1 = { - "count": 1, - "labels": [], - "properties": { - "id": { - "unique": True, - "indexed": True, - "type": "STRING", - "existence": False, - }, - }, - "type": "node", - "relationships": { - "RELATIONSHIP_1": { - "count": 0, - "direction": "out", - "labels": ["Label_1"], - "properties": {}, - } - }, - } - self.record_2 = { - "count": 2, - "labels": [], - "properties": { - "id": { - "unique": True, - "indexed": True, - "type": "STRING", - "existence": False, - }, - "amount": { - "unique": True, - "indexed": True, - "type": "INTEGER", - "existence": False, - }, - }, - "type": "node", - "relationships": { - "RELATIONSHIP_1": { - "count": 0, - "direction": "out", - "labels": ["Label_1"], - "properties": {}, +@pytest.fixture +def tracking_uri(tmp_path: Path) -> str: + # return str(tmp_path / "neo4j") + return "neo4j+ssc://host:7687" + + +@pytest.fixture +def source(tracking_uri: str) -> Neo4jConfig: + return Neo4jSource( + ctx=PipelineContext(run_id="neo4j-test"), + config=Neo4jConfig(uri=tracking_uri), + ) + + +def data(): + return [ + { + "key": "Node_1", + "value": { + "count": 433026, + "relationships": { + "RELATIONSHIP_1": { + "count": 1, + "properties": { + "Relationship1_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, + "direction": "in", + "labels": ["Node_2"], + } }, "RELATIONSHIP_2": { - "count": 1, + "count": 2, + "properties": { + "Relationship2_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, "direction": "in", - "labels": ["Label_1", "Label_2"], - "properties": {}, + "labels": ["Node_3"], }, + "type": "node", + "properties": { + "Node1_Property1": { + "existence": False, + "type": "DATE", + "indexed": False, + "unique": False, + }, + "Node1_Property2": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + "Node1_Property3": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + }, + "labels": [], }, - } - self.record_3 = {"count": 3, "properties": {}, "type": "relationship"} - self.record_4 = { - "RELATIONSHIP_2": { - "count": 4, - "properties": {}, - "type": "relationship", + }, + { + "key": "Node_2", + "value": { + "count": 3, "relationships": { "RELATIONSHIP_1": { - "count": 0, + "count": 1, + "properties": { + "Relationship1_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, "direction": "out", - "labels": ["Label_1"], - "properties": {}, + "labels": ["Node_2"], + } + }, + "type": "node", + "properties": { + "Node2_Property1": { + "existence": False, + "type": "DATE", + "indexed": False, + "unique": False, }, - "RELATIONSHIP_2": { - "count": 1, - "direction": "in", - "labels": ["Label_1", "Label_2"], - "properties": {}, + "Node2_Property2": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + "Node2_Property3": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, }, }, - } - } - - def create_df(self): - data = { - "key": ["item1", "item2", "item3", "RELATIONSHIP_2"], - "value": [ - self.record_1, - self.record_2, - self.record_3, - self.record_4, - ], - } - df = pd.DataFrame(data) - return df - - def test_get_obj_type(self): - assert self.neo.get_obj_type(self.record_1) == "node" - assert self.neo.get_obj_type(self.record_2) == "node" - assert self.neo.get_obj_type(self.record_3) == "relationship" - - def test_get_relationships(self): - assert self.neo.get_relationships(self.record_1, self.create_df()) == { - "RELATIONSHIP_1": { - "count": 0, - "direction": "out", - "labels": ["Label_1"], - "properties": {}, - } - } - assert self.neo.get_relationships(self.record_2, self.create_df()) == { - "RELATIONSHIP_1": { - "count": 0, - "direction": "out", - "labels": ["Label_1"], - "properties": {}, - }, - "RELATIONSHIP_2": { - "count": 1, - "direction": "in", - "labels": ["Label_1", "Label_2"], - "properties": {}, + "labels": [], }, - } - assert self.neo.get_relationships(self.record_3, self.create_df()) is None - - def test_get_property_data_types(self): - record_1 = self.record_1.get("properties", None) - record_2 = self.record_2.get("properties", None) - assert self.neo.get_property_data_types(record_1) == [{"id": "STRING"}] - assert self.neo.get_property_data_types(record_2) == [ - {"id": "STRING"}, - {"amount": "INTEGER"}, - ] - - def test_get_properties(self): - assert self.neo.get_properties(self.record_1) == { - "id": { - "unique": True, - "indexed": True, - "type": "STRING", - "existence": False, + }, + { + "key": "RELATIONSHIP_1", + "value": { + "count": 4, + "type": "relationship", + "properties": { + "Relationship1_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, }, - } - assert self.neo.get_properties(self.record_2) == self.record_2.get( - "properties", None - ) + }, + ] + + +def test_process_nodes(source): + df = source.process_nodes(data=data()) + assert type(df) is pd.DataFrame + + +def test_process_relationships(source): + df = source.process_relationships( + data=data(), node_df=source.process_nodes(data=data()) + ) + assert type(df) is pd.DataFrame + + +def test_get_obj_type(source): + results = data() + assert source.get_obj_type(results[0]["value"]) == "node" + assert source.get_obj_type(results[1]["value"]) == "node" + assert source.get_obj_type(results[2]["value"]) == "relationship" + + +def test_get_node_description(source): + results = data() + df = source.process_nodes(data=data()) + assert ( + source.get_node_description(results[0], df) + == "(Node_1)<-[RELATIONSHIP_1]-(Node_2)" + ) + assert ( + source.get_node_description(results[1], df) + == "(Node_2)-[RELATIONSHIP_1]->(Node_2)" + ) + + +def test_get_property_data_types(source): + results = data() + assert source.get_property_data_types(results[0]["value"]["properties"]) == [ + {"Node1_Property1": "DATE"}, + {"Node1_Property2": "STRING"}, + {"Node1_Property3": "STRING"}, + ] + assert source.get_property_data_types(results[1]["value"]["properties"]) == [ + {"Node2_Property1": "DATE"}, + {"Node2_Property2": "STRING"}, + {"Node2_Property3": "STRING"}, + ] + assert source.get_property_data_types(results[2]["value"]["properties"]) == [ + {"Relationship1_Property1": "STRING"} + ] + + +def test_get_properties(source): + results = data() + assert list(source.get_properties(results[0]["value"]).keys()) == [ + "Node1_Property1", + "Node1_Property2", + "Node1_Property3", + ] + assert list(source.get_properties(results[1]["value"]).keys()) == [ + "Node2_Property1", + "Node2_Property2", + "Node2_Property3", + ] + assert list(source.get_properties(results[2]["value"]).keys()) == [ + "Relationship1_Property1" + ] + + +def test_get_relationships(source): + results = data() + record = list( + results[0]["value"]["relationships"].keys() + ) # Get the first key from the dict_keys + assert record == ["RELATIONSHIP_1"] if __name__ == "__main__":