Skip to content

Commit

Permalink
feat: connector for Neo4j
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-fullsight committed Nov 21, 2024
1 parent b4499d8 commit aadd71a
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 131 deletions.
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@
"qlik-sense": sqlglot_lib | {"requests", "websocket-client"},
"sigma": sqlglot_lib | {"requests"},
"sac": sac,
"neo4j": {"pandas", "neo4j"},
}

# This is mainly used to exclude plugins from the Docker image.
Expand Down Expand Up @@ -668,6 +669,7 @@
"sigma",
"sac",
"cassandra",
"neo4j",
]
if plugin
for dependency in plugins[plugin]
Expand Down Expand Up @@ -787,6 +789,7 @@
"sigma = datahub.ingestion.source.sigma.sigma:SigmaSource",
"sac = datahub.ingestion.source.sac.sac:SACSource",
"cassandra = datahub.ingestion.source.cassandra.cassandra:CassandraSource",
"neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource",
],
"datahub.ingestion.transformer.plugins": [
"pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership",
Expand Down
325 changes: 194 additions & 131 deletions metadata-ingestion/tests/unit/test_neo4j_source.py
Original file line number Diff line number Diff line change
@@ -1,155 +1,218 @@
import unittest
from pathlib import Path

import pandas as pd
import pytest

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.neo4j.neo4j_source import Neo4jConfig, Neo4jSource


class TestNeo4j(unittest.TestCase):
def setUp(self):
self.neo = Neo4jSource(Neo4jConfig(), PipelineContext(run_id="test"))
self.record_1 = {
"count": 1,
"labels": [],
"properties": {
"id": {
"unique": True,
"indexed": True,
"type": "STRING",
"existence": False,
},
},
"type": "node",
"relationships": {
"RELATIONSHIP_1": {
"count": 0,
"direction": "out",
"labels": ["Label_1"],
"properties": {},
}
},
}
self.record_2 = {
"count": 2,
"labels": [],
"properties": {
"id": {
"unique": True,
"indexed": True,
"type": "STRING",
"existence": False,
},
"amount": {
"unique": True,
"indexed": True,
"type": "INTEGER",
"existence": False,
},
},
"type": "node",
"relationships": {
"RELATIONSHIP_1": {
"count": 0,
"direction": "out",
"labels": ["Label_1"],
"properties": {},
@pytest.fixture
def tracking_uri(tmp_path: Path) -> str:
# return str(tmp_path / "neo4j")
return "neo4j+ssc://host:7687"


@pytest.fixture
def source(tracking_uri: str) -> Neo4jConfig:
return Neo4jSource(
ctx=PipelineContext(run_id="neo4j-test"),
config=Neo4jConfig(uri=tracking_uri),
)


def data():
return [
{
"key": "Node_1",
"value": {
"count": 433026,
"relationships": {
"RELATIONSHIP_1": {
"count": 1,
"properties": {
"Relationship1_Property1": {
"existence": False,
"type": "STRING",
"indexed": False,
"array": False,
}
},
"direction": "in",
"labels": ["Node_2"],
}
},
"RELATIONSHIP_2": {
"count": 1,
"count": 2,
"properties": {
"Relationship2_Property1": {
"existence": False,
"type": "STRING",
"indexed": False,
"array": False,
}
},
"direction": "in",
"labels": ["Label_1", "Label_2"],
"properties": {},
"labels": ["Node_3"],
},
"type": "node",
"properties": {
"Node1_Property1": {
"existence": False,
"type": "DATE",
"indexed": False,
"unique": False,
},
"Node1_Property2": {
"existence": False,
"type": "STRING",
"indexed": False,
"unique": False,
},
"Node1_Property3": {
"existence": False,
"type": "STRING",
"indexed": False,
"unique": False,
},
},
"labels": [],
},
}
self.record_3 = {"count": 3, "properties": {}, "type": "relationship"}
self.record_4 = {
"RELATIONSHIP_2": {
"count": 4,
"properties": {},
"type": "relationship",
},
{
"key": "Node_2",
"value": {
"count": 3,
"relationships": {
"RELATIONSHIP_1": {
"count": 0,
"count": 1,
"properties": {
"Relationship1_Property1": {
"existence": False,
"type": "STRING",
"indexed": False,
"array": False,
}
},
"direction": "out",
"labels": ["Label_1"],
"properties": {},
"labels": ["Node_2"],
}
},
"type": "node",
"properties": {
"Node2_Property1": {
"existence": False,
"type": "DATE",
"indexed": False,
"unique": False,
},
"RELATIONSHIP_2": {
"count": 1,
"direction": "in",
"labels": ["Label_1", "Label_2"],
"properties": {},
"Node2_Property2": {
"existence": False,
"type": "STRING",
"indexed": False,
"unique": False,
},
"Node2_Property3": {
"existence": False,
"type": "STRING",
"indexed": False,
"unique": False,
},
},
}
}

def create_df(self):
data = {
"key": ["item1", "item2", "item3", "RELATIONSHIP_2"],
"value": [
self.record_1,
self.record_2,
self.record_3,
self.record_4,
],
}
df = pd.DataFrame(data)
return df

def test_get_obj_type(self):
assert self.neo.get_obj_type(self.record_1) == "node"
assert self.neo.get_obj_type(self.record_2) == "node"
assert self.neo.get_obj_type(self.record_3) == "relationship"

def test_get_relationships(self):
assert self.neo.get_relationships(self.record_1, self.create_df()) == {
"RELATIONSHIP_1": {
"count": 0,
"direction": "out",
"labels": ["Label_1"],
"properties": {},
}
}
assert self.neo.get_relationships(self.record_2, self.create_df()) == {
"RELATIONSHIP_1": {
"count": 0,
"direction": "out",
"labels": ["Label_1"],
"properties": {},
},
"RELATIONSHIP_2": {
"count": 1,
"direction": "in",
"labels": ["Label_1", "Label_2"],
"properties": {},
"labels": [],
},
}
assert self.neo.get_relationships(self.record_3, self.create_df()) is None

def test_get_property_data_types(self):
record_1 = self.record_1.get("properties", None)
record_2 = self.record_2.get("properties", None)
assert self.neo.get_property_data_types(record_1) == [{"id": "STRING"}]
assert self.neo.get_property_data_types(record_2) == [
{"id": "STRING"},
{"amount": "INTEGER"},
]

def test_get_properties(self):
assert self.neo.get_properties(self.record_1) == {
"id": {
"unique": True,
"indexed": True,
"type": "STRING",
"existence": False,
},
{
"key": "RELATIONSHIP_1",
"value": {
"count": 4,
"type": "relationship",
"properties": {
"Relationship1_Property1": {
"existence": False,
"type": "STRING",
"indexed": False,
"array": False,
}
},
},
}
assert self.neo.get_properties(self.record_2) == self.record_2.get(
"properties", None
)
},
]


def test_process_nodes(source):
df = source.process_nodes(data=data())
assert type(df) is pd.DataFrame


def test_process_relationships(source):
df = source.process_relationships(
data=data(), node_df=source.process_nodes(data=data())
)
assert type(df) is pd.DataFrame


def test_get_obj_type(source):
results = data()
assert source.get_obj_type(results[0]["value"]) == "node"
assert source.get_obj_type(results[1]["value"]) == "node"
assert source.get_obj_type(results[2]["value"]) == "relationship"


def test_get_node_description(source):
results = data()
df = source.process_nodes(data=data())
assert (
source.get_node_description(results[0], df)
== "(Node_1)<-[RELATIONSHIP_1]-(Node_2)"
)
assert (
source.get_node_description(results[1], df)
== "(Node_2)-[RELATIONSHIP_1]->(Node_2)"
)


def test_get_property_data_types(source):
results = data()
assert source.get_property_data_types(results[0]["value"]["properties"]) == [
{"Node1_Property1": "DATE"},
{"Node1_Property2": "STRING"},
{"Node1_Property3": "STRING"},
]
assert source.get_property_data_types(results[1]["value"]["properties"]) == [
{"Node2_Property1": "DATE"},
{"Node2_Property2": "STRING"},
{"Node2_Property3": "STRING"},
]
assert source.get_property_data_types(results[2]["value"]["properties"]) == [
{"Relationship1_Property1": "STRING"}
]


def test_get_properties(source):
results = data()
assert list(source.get_properties(results[0]["value"]).keys()) == [
"Node1_Property1",
"Node1_Property2",
"Node1_Property3",
]
assert list(source.get_properties(results[1]["value"]).keys()) == [
"Node2_Property1",
"Node2_Property2",
"Node2_Property3",
]
assert list(source.get_properties(results[2]["value"]).keys()) == [
"Relationship1_Property1"
]


def test_get_relationships(source):
results = data()
record = list(
results[0]["value"]["relationships"].keys()
) # Get the first key from the dict_keys
assert record == ["RELATIONSHIP_1"]


if __name__ == "__main__":
Expand Down

0 comments on commit aadd71a

Please sign in to comment.