Skip to content

Commit

Permalink
Merge branch 'master' into kafka_schemas_flag
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal authored Dec 11, 2024
2 parents 3b5bc58 + e6cc676 commit 609fda6
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
"displayName": "Athena",
"description": "Import Schemas, Tables, Views, and lineage to S3 from Athena.",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/athena/",
"recipe": "source:\n type: athena\n config:\n # Coordinates\n aws_region: my_aws_region\n work_group: primary\n\n # Options\n s3_staging_dir: \"s3://my_staging_athena_results_bucket/results/\""
"recipe": "source:\n type: athena\n config:\n # AWS Keys (Optional - Required only if local aws credentials are not set)\n username: aws_access_key_id\n password: aws_secret_access_key\n # Coordinates\n aws_region: my_aws_region\n work_group: primary\n\n # Options\n s3_staging_dir: \"s3://my_staging_athena_results_bucket/results/\""
},
{
"urn": "urn:li:dataPlatform:clickhouse",
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/docs/sources/athena/athena_recipe.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
source:
type: athena
config:

# AWS Keys (Optional - Required only if local aws credentials are not set)
username: my_aws_access_key_id
password: my_aws_secret_access_key

# Coordinates
aws_region: my_aws_region
work_group: primary
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/abs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List:
).infer_schema(file)
elif extension == ".json":
fields = json.JsonInferrer().infer_schema(file)
elif extension == ".jsonl":
fields = json.JsonInferrer(
max_rows=self.source_config.max_rows, format="jsonl"
).infer_schema(file)
elif extension == ".avro":
fields = avro.AvroInferrer().infer_schema(file)
else:
Expand Down
10 changes: 5 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ def get_workunits_internal(
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
if self.dataprocess_cleanup:
try:
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
if self.soft_deleted_entities_cleanup:
try:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
Expand All @@ -170,6 +165,11 @@ def get_workunits_internal(
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
if self.dataprocess_cleanup:
try:
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
yield from []

def truncate_indices(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel):
description="Query to filter entities",
)
limit_entities_delete: Optional[int] = Field(
10000, description="Max number of entities to delete."
25000, description="Max number of entities to delete."
)

runtime_limit_seconds: Optional[int] = Field(
Expand Down

0 comments on commit 609fda6

Please sign in to comment.