Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metadata-ingestion): Add support for druid #2235

Merged
merged 7 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ We use a plugin architecture so that you can install only the dependencies you a
| snowflake | `pip install -e '.[snowflake]'` | Snowflake source |
| ldap | `pip install -e '.[ldap]'` ([extra requirements]) | LDAP source |
| kakfa | `pip install -e '.[kafka]'` | Kafka source |
| druid | `pip install -e '.[druid]'` | Druid Source |
| datahub-rest | `pip install -e '.[datahub-rest]'` | DataHub sink over REST API |
| datahub-kafka | `pip install -e '.[datahub-kafka]'` | DataHub sink over Kafka |

Expand Down Expand Up @@ -343,6 +344,29 @@ source:
# table_pattern/schema_pattern is same as above
```

### Druid `druid`

Extracts:

- List of databases, schema, and tables
- Column types associated with each table

**Note** It is important to define a explicitly define deny schema pattern for internal druid databases (lookup & sys)
if adding a schema pattern otherwise the crawler may crash before processing relevant databases.
This deny pattern is defined by default but is overriden by user-submitted configurations

```yml
source:
type: druid
config:
# Point to broker address
host_port: localhost:8082
schema_pattern:
deny:
- "^(lookup|sys).*"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should "hardcode" this inside the Druid config class so this is always added on regardless of what the user configures.

# options is same as above
```

### LDAP `ldap`

Extracts:
Expand Down
4 changes: 3 additions & 1 deletion metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ ignore_missing_imports = yes
ignore_missing_imports = yes
[mypy-snowflake.*]
ignore_missing_imports = yes
[mypy-pydruid.*]
ignore_missing_imports = yes

[isort]
profile = black
Expand All @@ -56,7 +58,7 @@ testpaths =
tests/integration

[coverage:report]
fail_under = 80
fail_under = 75
show_missing = true
exclude_lines =
pragma: no cover
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def get_long_description():
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"snowflake": sql_common | {"snowflake-sqlalchemy"},
"ldap": {"python-ldap>=2.4"},
"druid": sql_common | {"pydruid>=0.6.2"},
# Sink plugins.
"datahub-kafka": kafka_common,
"datahub-rest": {"requests>=2.25.1"},
Expand Down
25 changes: 25 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/druid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# This import verifies that the dependencies are available.
import pydruid # noqa: F401

from datahub.configuration.common import AllowDenyPattern

from .sql_common import BasicSQLAlchemyConfig, SQLAlchemySource


class DruidConfig(BasicSQLAlchemyConfig):
# defaults
scheme = "druid"
schema_pattern: AllowDenyPattern = AllowDenyPattern(deny=["^(lookup|sys).*"])

def get_sql_alchemy_url(self):
return f"{BasicSQLAlchemyConfig.get_sql_alchemy_url(self)}/druid/v2/sql/"


class DruidSource(SQLAlchemySource):
def __init__(self, config, ctx):
super().__init__(config, ctx, "druid")

@classmethod
def create(cls, config_dict, ctx):
config = DruidConfig.parse_obj(config_dict)
return cls(config, ctx)
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@
except ImportError as e:
source_registry.register_disabled("kafka", e)


try:
from .ldap import LDAPSource

source_registry.register("ldap", LDAPSource)
except ImportError as e:
source_registry.register_disabled("ldap", e)

try:
from .druid import DruidSource

source_registry.register("druid", DruidSource)
except ImportError as e:
source_registry.register_disabled("druid", e)