diff --git a/docs/cli.md b/docs/cli.md index c633b7f4a38ad..1c38077d0d12e 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -115,6 +115,19 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml --dry-run datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml -n ``` +#### ingest --list-source-runs + +The `--list-source-runs` option of the `ingest` command lists the previous runs, displaying their run ID, source name, +start time, status, and source URN. This command allows you to filter results using the --urn option for URN-based +filtering or the --source option to filter by source name (partial or complete matches are supported). + +```shell +# List all ingestion runs +datahub ingest --list-source-runs +# Filter runs by a source name containing "demo" +datahub ingest --list-source-runs --source "demo" +``` + #### ingest --preview The `--preview` option of the `ingest` command performs all of the ingestion steps, but limits the processing to only the first 10 workunits produced by the source. diff --git a/docs/how/delete-metadata.md b/docs/how/delete-metadata.md index f720a66ce5765..e36940bf39835 100644 --- a/docs/how/delete-metadata.md +++ b/docs/how/delete-metadata.md @@ -4,7 +4,7 @@ To follow this guide, you'll need the [DataHub CLI](../cli.md). ::: -There are a two ways to delete metadata from DataHub: +There are two ways to delete metadata from DataHub: 1. Delete metadata attached to entities by providing a specific urn or filters that identify a set of urns (delete CLI). 2. Delete metadata created by a single ingestion run (rollback). @@ -233,7 +233,13 @@ To view the ids of the most recent set of ingestion batches, execute datahub ingest list-runs ``` -That will print out a table of all the runs. Once you have an idea of which run you want to roll back, run +That will print out a table of all the runs. To see run statuses or to filter runs by URN/source run + +```shell +datahub ingest list-source-runs +``` + +Once you have an idea of which run you want to roll back, run ```shell datahub ingest show --run-id diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 51f095751f7dd..fcab07a1c2aaf 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -27,6 +27,7 @@ logger = logging.getLogger(__name__) +INGEST_SRC_TABLE_COLUMNS = ["runId", "source", "startTime", "status", "URN"] RUNS_TABLE_COLUMNS = ["runId", "rows", "created at"] RUN_TABLE_COLUMNS = ["urn", "aspect name", "created at"] @@ -437,6 +438,115 @@ def mcps(path: str) -> None: sys.exit(ret) +@ingest.command() +@click.argument("page_offset", type=int, default=0) +@click.argument("page_size", type=int, default=100) +@click.option("--urn", type=str, default=None, help="Filter by ingestion source URN.") +@click.option( + "--source", type=str, default=None, help="Filter by ingestion source name." +) +@upgrade.check_upgrade +@telemetry.with_telemetry() +def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) -> None: + """List ingestion source runs with their details, optionally filtered by URN or source.""" + + query = """ + query listIngestionRuns($input: ListIngestionSourcesInput!) { + listIngestionSources(input: $input) { + ingestionSources { + urn + name + executions { + executionRequests { + id + result { + startTimeMs + status + } + } + } + } + } + } + """ + + # filter by urn and/or source using CONTAINS + filters = [] + if urn: + filters.append({"field": "urn", "values": [urn], "condition": "CONTAIN"}) + if source: + filters.append({"field": "name", "values": [source], "condition": "CONTAIN"}) + + variables = { + "input": { + "start": page_offset, + "count": page_size, + "filters": filters, + } + } + + client = get_default_graph() + session = client._session + gms_host = client.config.server + + url = f"{gms_host}/api/graphql" + try: + response = session.post(url, json={"query": query, "variables": variables}) + response.raise_for_status() + except Exception as e: + click.echo(f"Error fetching data: {str(e)}") + return + + try: + data = response.json() + except ValueError: + click.echo("Failed to parse JSON response from server.") + return + + if not data: + click.echo("No response received from the server.") + return + + # when urn or source filter does not match, exit gracefully + if ( + not isinstance(data.get("data"), dict) + or "listIngestionSources" not in data["data"] + ): + click.echo("No matching ingestion sources found. Please check your filters.") + return + + ingestion_sources = data["data"]["listIngestionSources"]["ingestionSources"] + if not ingestion_sources: + click.echo("No ingestion sources or executions found.") + return + + rows = [] + for ingestion_source in ingestion_sources: + urn = ingestion_source.get("urn", "N/A") + name = ingestion_source.get("name", "N/A") + + executions = ingestion_source.get("executions", {}).get("executionRequests", []) + for execution in executions: + execution_id = execution.get("id", "N/A") + start_time = execution.get("result", {}).get("startTimeMs", "N/A") + start_time = ( + datetime.fromtimestamp(start_time / 1000).strftime("%Y-%m-%d %H:%M:%S") + if start_time != "N/A" + else "N/A" + ) + status = execution.get("result", {}).get("status", "N/A") + + rows.append([execution_id, name, start_time, status, urn]) + + click.echo( + tabulate( + rows, + headers=INGEST_SRC_TABLE_COLUMNS, + tablefmt="grid", + ) + ) + + @ingest.command() @click.argument("page_offset", type=int, default=0) @click.argument("page_size", type=int, default=100)