Skip to content

Commit

Permalink
feat(cli): added cli option for ingestion source (datahub-project#11980)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinkarchacryl authored Dec 16, 2024
1 parent 6b8d21a commit d0b4f7a
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 2 deletions.
13 changes: 13 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml --dry-run
datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml -n
```

#### ingest --list-source-runs

The `--list-source-runs` option of the `ingest` command lists the previous runs, displaying their run ID, source name,
start time, status, and source URN. This command allows you to filter results using the --urn option for URN-based
filtering or the --source option to filter by source name (partial or complete matches are supported).

```shell
# List all ingestion runs
datahub ingest --list-source-runs
# Filter runs by a source name containing "demo"
datahub ingest --list-source-runs --source "demo"
```

#### ingest --preview

The `--preview` option of the `ingest` command performs all of the ingestion steps, but limits the processing to only the first 10 workunits produced by the source.
Expand Down
10 changes: 8 additions & 2 deletions docs/how/delete-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
To follow this guide, you'll need the [DataHub CLI](../cli.md).
:::

There are a two ways to delete metadata from DataHub:
There are two ways to delete metadata from DataHub:

1. Delete metadata attached to entities by providing a specific urn or filters that identify a set of urns (delete CLI).
2. Delete metadata created by a single ingestion run (rollback).
Expand Down Expand Up @@ -233,7 +233,13 @@ To view the ids of the most recent set of ingestion batches, execute
datahub ingest list-runs
```

That will print out a table of all the runs. Once you have an idea of which run you want to roll back, run
That will print out a table of all the runs. To see run statuses or to filter runs by URN/source run

```shell
datahub ingest list-source-runs
```

Once you have an idea of which run you want to roll back, run

```shell
datahub ingest show --run-id <run-id>
Expand Down
110 changes: 110 additions & 0 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

logger = logging.getLogger(__name__)

INGEST_SRC_TABLE_COLUMNS = ["runId", "source", "startTime", "status", "URN"]
RUNS_TABLE_COLUMNS = ["runId", "rows", "created at"]
RUN_TABLE_COLUMNS = ["urn", "aspect name", "created at"]

Expand Down Expand Up @@ -437,6 +438,115 @@ def mcps(path: str) -> None:
sys.exit(ret)


@ingest.command()
@click.argument("page_offset", type=int, default=0)
@click.argument("page_size", type=int, default=100)
@click.option("--urn", type=str, default=None, help="Filter by ingestion source URN.")
@click.option(
"--source", type=str, default=None, help="Filter by ingestion source name."
)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) -> None:
"""List ingestion source runs with their details, optionally filtered by URN or source."""

query = """
query listIngestionRuns($input: ListIngestionSourcesInput!) {
listIngestionSources(input: $input) {
ingestionSources {
urn
name
executions {
executionRequests {
id
result {
startTimeMs
status
}
}
}
}
}
}
"""

# filter by urn and/or source using CONTAINS
filters = []
if urn:
filters.append({"field": "urn", "values": [urn], "condition": "CONTAIN"})
if source:
filters.append({"field": "name", "values": [source], "condition": "CONTAIN"})

variables = {
"input": {
"start": page_offset,
"count": page_size,
"filters": filters,
}
}

client = get_default_graph()
session = client._session
gms_host = client.config.server

url = f"{gms_host}/api/graphql"
try:
response = session.post(url, json={"query": query, "variables": variables})
response.raise_for_status()
except Exception as e:
click.echo(f"Error fetching data: {str(e)}")
return

try:
data = response.json()
except ValueError:
click.echo("Failed to parse JSON response from server.")
return

if not data:
click.echo("No response received from the server.")
return

# when urn or source filter does not match, exit gracefully
if (
not isinstance(data.get("data"), dict)
or "listIngestionSources" not in data["data"]
):
click.echo("No matching ingestion sources found. Please check your filters.")
return

ingestion_sources = data["data"]["listIngestionSources"]["ingestionSources"]
if not ingestion_sources:
click.echo("No ingestion sources or executions found.")
return

rows = []
for ingestion_source in ingestion_sources:
urn = ingestion_source.get("urn", "N/A")
name = ingestion_source.get("name", "N/A")

executions = ingestion_source.get("executions", {}).get("executionRequests", [])
for execution in executions:
execution_id = execution.get("id", "N/A")
start_time = execution.get("result", {}).get("startTimeMs", "N/A")
start_time = (
datetime.fromtimestamp(start_time / 1000).strftime("%Y-%m-%d %H:%M:%S")
if start_time != "N/A"
else "N/A"
)
status = execution.get("result", {}).get("status", "N/A")

rows.append([execution_id, name, start_time, status, urn])

click.echo(
tabulate(
rows,
headers=INGEST_SRC_TABLE_COLUMNS,
tablefmt="grid",
)
)


@ingest.command()
@click.argument("page_offset", type=int, default=0)
@click.argument("page_size", type=int, default=100)
Expand Down

0 comments on commit d0b4f7a

Please sign in to comment.