diff --git a/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml b/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml index 71202d4216df9..af5985b1575e2 100644 --- a/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml +++ b/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml @@ -4,7 +4,7 @@ source: uri: 'neo4j+ssc://host:7687' username: 'neo4j' password: 'password' - gms_server: &gms_server 'http://localhost:8080' + gms_server: 'http://localhost:8080' node_tag: 'Node' relationship_tag: 'Relationship' environment: 'PROD' @@ -12,4 +12,4 @@ source: sink: type: "datahub-rest" config: - server: *gms_server \ No newline at end of file + server: 'http://localhost:8080' \ No newline at end of file diff --git a/metadata-ingestion/examples/recipes/neo4j_to_datahub.dhub.yaml b/metadata-ingestion/examples/recipes/neo4j_to_datahub.dhub.yaml index 71202d4216df9..af5985b1575e2 100644 --- a/metadata-ingestion/examples/recipes/neo4j_to_datahub.dhub.yaml +++ b/metadata-ingestion/examples/recipes/neo4j_to_datahub.dhub.yaml @@ -4,7 +4,7 @@ source: uri: 'neo4j+ssc://host:7687' username: 'neo4j' password: 'password' - gms_server: &gms_server 'http://localhost:8080' + gms_server: 'http://localhost:8080' node_tag: 'Node' relationship_tag: 'Relationship' environment: 'PROD' @@ -12,4 +12,4 @@ source: sink: type: "datahub-rest" config: - server: *gms_server \ No newline at end of file + server: 'http://localhost:8080' \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py b/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py index becea2c3bd763..de4790d756dcd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py @@ -1,7 +1,7 @@ import logging import time from dataclasses import dataclass -from typing import Iterable, Optional +from typing import Dict, Iterable, Optional, Type, Union import pandas as pd from neo4j import GraphDatabase @@ -24,6 +24,7 @@ from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph +from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaFieldDataType from datahub.metadata.schema_classes import ( AuditStampClass, BooleanTypeClass, @@ -33,7 +34,6 @@ NumberTypeClass, OtherSchemaClass, SchemaFieldClass, - SchemaFieldDataTypeClass, SchemaMetadataClass, StringTypeClass, TagAssociationClass, @@ -43,6 +43,18 @@ log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) +_type_mapping: Dict[Union[Type, str], Type] = { + "list": UnionTypeClass, + "boolean": BooleanTypeClass, + "integer": NumberTypeClass, + "local_date_time": DateTypeClass, + "float": NumberTypeClass, + "string": StringTypeClass, + "date": DateTypeClass, + "node": StringTypeClass, + "relationship": StringTypeClass, +} + class Neo4jConfig(ConfigModel): username: str = Field(default=None, description="Neo4j Username") @@ -59,17 +71,6 @@ class Neo4jConfig(ConfigModel): description="The tag that will be used to show that the Neo4j object is a Relationship", ) platform: str = Field(default="neo4j", description="Neo4j platform") - type_mapping = { - "string": StringTypeClass(), - "boolean": BooleanTypeClass(), - "float": NumberTypeClass(), - "integer": NumberTypeClass(), - "date": DateTypeClass(), - "relationship": StringTypeClass(), - "node": StringTypeClass(), - "local_date_time": DateTypeClass(), - "list": UnionTypeClass(), - } @dataclass @@ -78,7 +79,7 @@ class Neo4jSourceReport(SourceReport): obj_created: int = 0 -@platform_name("Metadata File") +@platform_name("Neo4j",id="neo4j") @config_class(Neo4jConfig) @support_status(SupportStatus.CERTIFIED) class Neo4jSource(Source): @@ -92,6 +93,10 @@ def create(cls, config_dict, ctx): config = Neo4jConfig.parse_obj(config_dict) return cls(ctx, config) + def get_field_type(self, attribute_type: Union[type, str]) -> SchemaFieldDataType: + type_class: Optional[type] = _type_mapping.get(attribute_type) + return SchemaFieldDataType(type=type_class()) + def get_schema_field_class( self, col_name: str, col_type: str, **kwargs ) -> SchemaFieldClass: @@ -101,7 +106,7 @@ def get_schema_field_class( col_type = col_type return SchemaFieldClass( fieldPath=col_name, - type=SchemaFieldDataTypeClass(type=self.config.type_mapping[col_type]), + type=self.get_field_type(col_type), nativeDataType=col_type, description=col_type.upper() if col_type in ("node", "relationship") @@ -312,6 +317,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: yield MetadataWorkUnit( id=row["key"], mcp_raw=self.generate_neo4j_object( + # mcp=self.generate_neo4j_object( columns=row["property_data_types"], dataset=row["key"], platform=self.config.platform,