Skip to content

Commit

Permalink
feat: connector for Neo4j
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-fullsight committed Oct 10, 2024
1 parent d4e3d1a commit 53c2463
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 19 deletions.
4 changes: 2 additions & 2 deletions metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ source:
uri: 'neo4j+ssc://host:7687'
username: 'neo4j'
password: 'password'
gms_server: &gms_server 'http://localhost:8080'
gms_server: 'http://localhost:8080'
node_tag: 'Node'
relationship_tag: 'Relationship'
environment: 'PROD'

sink:
type: "datahub-rest"
config:
server: *gms_server
server: 'http://localhost:8080'
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ source:
uri: 'neo4j+ssc://host:7687'
username: 'neo4j'
password: 'password'
gms_server: &gms_server 'http://localhost:8080'
gms_server: 'http://localhost:8080'
node_tag: 'Node'
relationship_tag: 'Relationship'
environment: 'PROD'

sink:
type: "datahub-rest"
config:
server: *gms_server
server: 'http://localhost:8080'
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import time
from dataclasses import dataclass
from typing import Iterable, Optional
from typing import Dict, Iterable, Optional, Type, Union

import pandas as pd
from neo4j import GraphDatabase
Expand All @@ -24,6 +24,7 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaFieldDataType
from datahub.metadata.schema_classes import (
AuditStampClass,
BooleanTypeClass,
Expand All @@ -33,7 +34,6 @@
NumberTypeClass,
OtherSchemaClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
SchemaMetadataClass,
StringTypeClass,
TagAssociationClass,
Expand All @@ -43,6 +43,18 @@
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

_type_mapping: Dict[Union[Type, str], Type] = {
"list": UnionTypeClass,
"boolean": BooleanTypeClass,
"integer": NumberTypeClass,
"local_date_time": DateTypeClass,
"float": NumberTypeClass,
"string": StringTypeClass,
"date": DateTypeClass,
"node": StringTypeClass,
"relationship": StringTypeClass,
}


class Neo4jConfig(ConfigModel):
username: str = Field(default=None, description="Neo4j Username")
Expand All @@ -59,17 +71,6 @@ class Neo4jConfig(ConfigModel):
description="The tag that will be used to show that the Neo4j object is a Relationship",
)
platform: str = Field(default="neo4j", description="Neo4j platform")
type_mapping = {
"string": StringTypeClass(),
"boolean": BooleanTypeClass(),
"float": NumberTypeClass(),
"integer": NumberTypeClass(),
"date": DateTypeClass(),
"relationship": StringTypeClass(),
"node": StringTypeClass(),
"local_date_time": DateTypeClass(),
"list": UnionTypeClass(),
}


@dataclass
Expand All @@ -78,7 +79,7 @@ class Neo4jSourceReport(SourceReport):
obj_created: int = 0


@platform_name("Metadata File")
@platform_name("Neo4j",id="neo4j")
@config_class(Neo4jConfig)
@support_status(SupportStatus.CERTIFIED)
class Neo4jSource(Source):
Expand All @@ -92,6 +93,10 @@ def create(cls, config_dict, ctx):
config = Neo4jConfig.parse_obj(config_dict)
return cls(ctx, config)

def get_field_type(self, attribute_type: Union[type, str]) -> SchemaFieldDataType:
type_class: Optional[type] = _type_mapping.get(attribute_type)
return SchemaFieldDataType(type=type_class())

def get_schema_field_class(
self, col_name: str, col_type: str, **kwargs
) -> SchemaFieldClass:
Expand All @@ -101,7 +106,7 @@ def get_schema_field_class(
col_type = col_type
return SchemaFieldClass(
fieldPath=col_name,
type=SchemaFieldDataTypeClass(type=self.config.type_mapping[col_type]),
type=self.get_field_type(col_type),
nativeDataType=col_type,
description=col_type.upper()
if col_type in ("node", "relationship")
Expand Down Expand Up @@ -312,6 +317,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield MetadataWorkUnit(
id=row["key"],
mcp_raw=self.generate_neo4j_object(
# mcp=self.generate_neo4j_object(
columns=row["property_data_types"],
dataset=row["key"],
platform=self.config.platform,
Expand Down

0 comments on commit 53c2463

Please sign in to comment.