From 3a815bc46f5c8f6e4ee9d27a999f1cbd4943e145 Mon Sep 17 00:00:00 2001 From: Sultan Iman <354868+sultaniman@users.noreply.github.com> Date: Thu, 21 Mar 2024 15:13:36 +0100 Subject: [PATCH] Streamlit improvements (#1060) * Streamlit improvements * Refactor menu and create custom widgets * Add tag component * Add destination name to sidebar * Adjust headers * Set color scheme * Fix linting issues * Move import dlt to the top * Use defaul tag type * Use lower contrast shade of white * Use smaller heading element * Handle dummy destination errors and implement custom light and dark modes * Cleanup streamlit app code * Fix linting issues * Fix linting issues * Extract components from streamlit explorer code * Cleanup redundant data display * Add tag bold text option * Cast label to string * Rework document sections and display resource incremental if given * Do not display primary and merge keys if they are not specified * Integrate streamlit app tests * Fix mypy issue * Add general rendering test checks for streamlit * Set default color mode as light * Display resource state in expandable area * More rendering checks * Cleanup tests * Add test case for streamlit app with dummy dlt destination * Ignore B015 error * Fix linting errors * Add hot reload support for streamlit via DLT_STREAMLIT_HOT_RELOAD variable * Move non-essential info from sidebar to load info page * Expand pipeline summary block by default * Sort table by column names * Fix mypy errors * Pass pipelines dir and use bool argument for streamlit hot reload * Keep resource state expanded if it is missing the state * Use DLT_PIPELINES_DIR in load info as well * Remove unused import * Do not sort table column * Extract pipeline attaching logic * Use pipeline name from cli arguments for load_info page * Move pipeline_state_info into blocks * Remove comment * Move menu into blocks * Extract pipeline loading logic * Show simple unordered list with pipeline summary * Cleanup redundant code * Adjust tests * Remove unused code * Move dashboard into pages * Refactor querying logic and stop using deprecated experimental caching for streamlit * Fix mypy errors * Use get_dlt_data_dir to resolve pipelines_dir * Add more tests and checks * Pass DLT_PIPELINES_DIR instead of modifying DLT_DATA_DIR * Restore os environment after streamlit app exits * Remove max-parallel for linting * Allow linting to fail fast and fix linting errors * Fix mypy errors * Format code * Show message when pipelines dir is passed * Show message if pipelines_dir is passed * Copy streamlit package check to streamlit_app module __init__ * Adjust mypy ignore * Fix mypy issues * moves load info around * Pass pipeline params after terminator * Remove info message when pipelines dir is passed * Restor system args after test * Fix linting error * Adjust streamlit command arguments passing * Add comments * Manually enforce column order --------- Co-authored-by: Marcin Rudolf --- .github/workflows/lint.yml | 3 +- dlt/cli/_dlt.py | 6 + dlt/cli/pipeline_command.py | 64 +++++++-- dlt/helpers/streamlit_app/__init__.py | 11 ++ dlt/helpers/streamlit_app/blocks/__init__.py | 0 dlt/helpers/streamlit_app/blocks/load_info.py | 40 ++++++ dlt/helpers/streamlit_app/blocks/menu.py | 14 ++ dlt/helpers/streamlit_app/blocks/query.py | 57 ++++++++ .../streamlit_app/blocks/resource_state.py | 29 ++++ dlt/helpers/streamlit_app/blocks/show_data.py | 21 +++ .../streamlit_app/blocks/table_hints.py | 80 +++++++++++ dlt/helpers/streamlit_app/index.py | 6 + dlt/helpers/streamlit_app/pages/__init__.py | 0 dlt/helpers/streamlit_app/pages/dashboard.py | 53 +++++++ dlt/helpers/streamlit_app/pages/load_info.py | 130 ++++++++++++++++++ dlt/helpers/streamlit_app/theme.py | 29 ++++ dlt/helpers/streamlit_app/utils.py | 77 +++++++++++ dlt/helpers/streamlit_app/widgets/__init__.py | 6 + .../widgets/color_mode_selector.py | 34 +++++ dlt/helpers/streamlit_app/widgets/logo.py | 46 +++++++ dlt/helpers/streamlit_app/widgets/schema.py | 21 +++ dlt/helpers/streamlit_app/widgets/stats.py | 58 ++++++++ dlt/helpers/streamlit_app/widgets/summary.py | 21 +++ dlt/helpers/streamlit_app/widgets/tags.py | 41 ++++++ .../test_streamlit_show_resources.py | 119 ++++++++++++++-- 25 files changed, 940 insertions(+), 26 deletions(-) create mode 100644 dlt/helpers/streamlit_app/__init__.py create mode 100644 dlt/helpers/streamlit_app/blocks/__init__.py create mode 100644 dlt/helpers/streamlit_app/blocks/load_info.py create mode 100644 dlt/helpers/streamlit_app/blocks/menu.py create mode 100644 dlt/helpers/streamlit_app/blocks/query.py create mode 100644 dlt/helpers/streamlit_app/blocks/resource_state.py create mode 100644 dlt/helpers/streamlit_app/blocks/show_data.py create mode 100644 dlt/helpers/streamlit_app/blocks/table_hints.py create mode 100644 dlt/helpers/streamlit_app/index.py create mode 100644 dlt/helpers/streamlit_app/pages/__init__.py create mode 100644 dlt/helpers/streamlit_app/pages/dashboard.py create mode 100644 dlt/helpers/streamlit_app/pages/load_info.py create mode 100644 dlt/helpers/streamlit_app/theme.py create mode 100644 dlt/helpers/streamlit_app/utils.py create mode 100644 dlt/helpers/streamlit_app/widgets/__init__.py create mode 100644 dlt/helpers/streamlit_app/widgets/color_mode_selector.py create mode 100644 dlt/helpers/streamlit_app/widgets/logo.py create mode 100644 dlt/helpers/streamlit_app/widgets/schema.py create mode 100644 dlt/helpers/streamlit_app/widgets/stats.py create mode 100644 dlt/helpers/streamlit_app/widgets/summary.py create mode 100644 dlt/helpers/streamlit_app/widgets/tags.py diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 67b172db5e..317124f8c8 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -21,8 +21,7 @@ jobs: needs: get_docs_changes if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' strategy: - fail-fast: false - max-parallel: 14 + fail-fast: true matrix: os: - ubuntu-latest diff --git a/dlt/cli/_dlt.py b/dlt/cli/_dlt.py index 9894227046..2332c0286c 100644 --- a/dlt/cli/_dlt.py +++ b/dlt/cli/_dlt.py @@ -443,6 +443,12 @@ def main() -> int: pipe_cmd.add_argument( "--list-pipelines", "-l", default=False, action="store_true", help="List local pipelines" ) + pipe_cmd.add_argument( + "--hot-reload", + default=False, + action="store_true", + help="Reload streamlit app (for core development)", + ) pipe_cmd.add_argument("pipeline_name", nargs="?", help="Pipeline name") pipe_cmd.add_argument("--pipelines-dir", help="Pipelines working directory", default=None) pipe_cmd.add_argument( diff --git a/dlt/cli/pipeline_command.py b/dlt/cli/pipeline_command.py index 9981fa8493..0eb73ad7a8 100644 --- a/dlt/cli/pipeline_command.py +++ b/dlt/cli/pipeline_command.py @@ -1,5 +1,5 @@ import yaml -from typing import Any, Sequence, Tuple +from typing import Any, Optional, Sequence, Tuple import dlt from dlt.cli.exceptions import CliCommandException @@ -15,6 +15,7 @@ from dlt.cli import echo as fmt + DLT_PIPELINE_COMMAND_DOCS_URL = "https://dlthub.com/docs/reference/command-line-interface" @@ -25,6 +26,7 @@ def pipeline_command( verbosity: int, dataset_name: str = None, destination: TDestinationReferenceArg = None, + hot_reload: Optional[bool] = False, **command_kwargs: Any, ) -> None: if operation == "list": @@ -48,7 +50,8 @@ def pipeline_command( raise fmt.warning(str(e)) if not fmt.confirm( - "Do you want to attempt to restore the pipeline state from destination?", default=False + "Do you want to attempt to restore the pipeline state from destination?", + default=False, ): return destination = destination or fmt.text_input( @@ -58,7 +61,10 @@ def pipeline_command( f"Enter dataset name for pipeline {fmt.bold(pipeline_name)}" ) p = dlt.pipeline( - pipeline_name, pipelines_dir, destination=destination, dataset_name=dataset_name + pipeline_name, + pipelines_dir, + destination=destination, + dataset_name=dataset_name, ) p.sync_destination() if p.first_run: @@ -101,13 +107,29 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]: if operation == "show": from dlt.common.runtime import signals - from dlt.helpers import streamlit_helper + from dlt.helpers.streamlit_app import index with signals.delayed_signals(): + streamlit_cmd = [ + "streamlit", + "run", + index.__file__, + "--client.showSidebarNavigation", + "false", + ] + + if hot_reload: + streamlit_cmd.append("--server.runOnSave") + streamlit_cmd.append("true") + + streamlit_cmd.append("--") + streamlit_cmd.append(pipeline_name) + if pipelines_dir: + streamlit_cmd.append("--pipelines-dir") + streamlit_cmd.append(pipelines_dir) + venv = Venv.restore_current() - for line in iter_stdout( - venv, "streamlit", "run", streamlit_helper.__file__, pipeline_name - ): + for line in iter_stdout(venv, *streamlit_cmd): fmt.echo(line) if operation == "info": @@ -255,7 +277,12 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]: tables = remove_defaults({"tables": package_info.schema_update}) # type: ignore fmt.echo(fmt.bold("Schema update:")) fmt.echo( - yaml.dump(tables, allow_unicode=True, default_flow_style=False, sort_keys=False) + yaml.dump( + tables, + allow_unicode=True, + default_flow_style=False, + sort_keys=False, + ) ) if operation == "schema": @@ -288,20 +315,33 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]: fmt.echo( "About to drop the following data in dataset %s in destination %s:" - % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.destination_name)) + % ( + fmt.bold(drop.info["dataset_name"]), + fmt.bold(p.destination.destination_name), + ) ) fmt.echo("%s: %s" % (fmt.style("Selected schema", fg="green"), drop.info["schema_name"])) fmt.echo( - "%s: %s" % (fmt.style("Selected resource(s)", fg="green"), drop.info["resource_names"]) + "%s: %s" + % ( + fmt.style("Selected resource(s)", fg="green"), + drop.info["resource_names"], + ) ) fmt.echo("%s: %s" % (fmt.style("Table(s) to drop", fg="green"), drop.info["tables"])) fmt.echo( "%s: %s" - % (fmt.style("Resource(s) state to reset", fg="green"), drop.info["resource_states"]) + % ( + fmt.style("Resource(s) state to reset", fg="green"), + drop.info["resource_states"], + ) ) fmt.echo( "%s: %s" - % (fmt.style("Source state path(s) to reset", fg="green"), drop.info["state_paths"]) + % ( + fmt.style("Source state path(s) to reset", fg="green"), + drop.info["state_paths"], + ) ) # for k, v in drop.info.items(): # fmt.echo("%s: %s" % (fmt.style(k, fg="green"), v)) diff --git a/dlt/helpers/streamlit_app/__init__.py b/dlt/helpers/streamlit_app/__init__.py new file mode 100644 index 0000000000..b304195a5a --- /dev/null +++ b/dlt/helpers/streamlit_app/__init__.py @@ -0,0 +1,11 @@ +from dlt.common.exceptions import MissingDependencyException + +# FIXME: Remove this after implementing package installer +try: + import streamlit +except ModuleNotFoundError: + raise MissingDependencyException( + "DLT Streamlit Helpers", + ["streamlit"], + "DLT Helpers for Streamlit should be run within a streamlit app.", + ) diff --git a/dlt/helpers/streamlit_app/blocks/__init__.py b/dlt/helpers/streamlit_app/blocks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/helpers/streamlit_app/blocks/load_info.py b/dlt/helpers/streamlit_app/blocks/load_info.py new file mode 100644 index 0000000000..134b5ad5a4 --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/load_info.py @@ -0,0 +1,40 @@ +import dlt +import humanize +import streamlit as st + +from dlt.common import pendulum +from dlt.helpers.streamlit_app.utils import query_data_live +from dlt.helpers.streamlit_app.widgets import stat + + +def last_load_info(pipeline: dlt.Pipeline) -> None: + loads_df = query_data_live( + pipeline, + f"SELECT load_id, inserted_at FROM {pipeline.default_schema.loads_table_name} WHERE" + " status = 0 ORDER BY inserted_at DESC LIMIT 101 ", + ) + + if loads_df is None: + st.error( + "Load info is not available", + icon="🚨", + ) + else: + loads_no = loads_df.shape[0] + if loads_df.shape[0] > 0: + rel_time = ( + humanize.naturaldelta( + pendulum.now() - pendulum.from_timestamp(loads_df.iloc[0, 1].timestamp()) + ) + + " ago" + ) + last_load_id = loads_df.iloc[0, 0] + if loads_no > 100: + loads_no = "> " + str(loads_no) + else: + rel_time = "---" + last_load_id = "---" + + stat("Last load time", rel_time, border_left_width=4) + stat("Last load id", last_load_id) + stat("Total number of loads", loads_no) diff --git a/dlt/helpers/streamlit_app/blocks/menu.py b/dlt/helpers/streamlit_app/blocks/menu.py new file mode 100644 index 0000000000..b6d0b5f7aa --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/menu.py @@ -0,0 +1,14 @@ +import dlt +import streamlit as st + +from dlt.helpers.streamlit_app.utils import HERE +from dlt.helpers.streamlit_app.widgets import logo, mode_selector +from dlt.helpers.streamlit_app.widgets import pipeline_summary + + +def menu(pipeline: dlt.Pipeline) -> None: + mode_selector() + logo() + st.page_link(f"{HERE}/pages/dashboard.py", label="Explore data", icon="🕹ī¸") + st.page_link(f"{HERE}/pages/load_info.py", label="Load info", icon="💾") + pipeline_summary(pipeline) diff --git a/dlt/helpers/streamlit_app/blocks/query.py b/dlt/helpers/streamlit_app/blocks/query.py new file mode 100644 index 0000000000..a03e9a0cd9 --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/query.py @@ -0,0 +1,57 @@ +from typing import Optional +import dlt +import streamlit as st + +from dlt.common.exceptions import MissingDependencyException +from dlt.helpers.streamlit_app.utils import query_data + + +def maybe_run_query( + pipeline: dlt.Pipeline, + show_charts: bool = True, + example_query: Optional[str] = "", +) -> None: + st.subheader("Run your query") + sql_query = st.text_area("Enter your SQL query", value=example_query) + if st.button("Run Query"): + if sql_query: + try: + # run the query from the text area + df = query_data(pipeline, sql_query, chunk_size=2048) + if df is None: + st.text("No rows returned") + else: + rows_count = df.shape[0] + st.text(f"{rows_count} row(s) returned") + st.dataframe(df) + try: + # now if the dataset has supported shape try to display the bar or altair chart + if df.dtypes.shape[0] == 1 and show_charts: + # try barchart + st.bar_chart(df) + if df.dtypes.shape[0] == 2 and show_charts: + # try to import altair charts + try: + import altair as alt + except ModuleNotFoundError: + raise MissingDependencyException( + "DLT Streamlit Helpers", + ["altair"], + "DLT Helpers for Streamlit should be run within a streamlit" + " app.", + ) + + # try altair + bar_chart = ( + alt.Chart(df) + .mark_bar() + .encode( + x=f"{df.columns[1]}:Q", y=alt.Y(f"{df.columns[0]}:N", sort="-x") + ) + ) + st.altair_chart(bar_chart, use_container_width=True) + except Exception as ex: + st.error(f"Chart failed due to: {ex}") + except Exception as ex: + st.text("Exception when running query") + st.exception(ex) diff --git a/dlt/helpers/streamlit_app/blocks/resource_state.py b/dlt/helpers/streamlit_app/blocks/resource_state.py new file mode 100644 index 0000000000..8ea1256a1f --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/resource_state.py @@ -0,0 +1,29 @@ +import dlt +import streamlit as st +import yaml + +from dlt.common import json +from dlt.common.libs.pandas import pandas as pd +from dlt.common.pipeline import resource_state, TSourceState +from dlt.common.schema.utils import group_tables_by_resource +from dlt.helpers.streamlit_app.widgets.tags import tag + + +def resource_state_info( + pipeline: dlt.Pipeline, + schema_name: str, + resource_name: str, +) -> None: + sources_state = pipeline.state.get("sources") or {} + schema = sources_state.get(schema_name) + if not schema: + st.error(f"Schema with name: {schema_name} is not found") + return + + resource = schema["resources"].get(resource_name) + with st.expander("Resource state", expanded=(resource is None)): + if not resource: + st.info(f"{resource_name} is missing resource state") + else: + spec = yaml.safe_dump(resource) + st.code(spec, language="yaml") diff --git a/dlt/helpers/streamlit_app/blocks/show_data.py b/dlt/helpers/streamlit_app/blocks/show_data.py new file mode 100644 index 0000000000..7aaab084f3 --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/show_data.py @@ -0,0 +1,21 @@ +from typing import List + +import dlt +import streamlit as st + +from dlt.helpers.streamlit_app.utils import query_data + + +def show_data_button(pipeline: dlt.Pipeline, table_name: str) -> None: + if st.button("SHOW DATA", key=table_name): + df = query_data(pipeline, f"SELECT * FROM {table_name}", chunk_size=2048) + if df is None: + st.text("No rows returned") + else: + rows_count = df.shape[0] + if df.shape[0] < 2048: + st.text(f"All {rows_count} row(s)") + else: + st.text(f"Top {rows_count} row(s)") + + st.dataframe(df) diff --git a/dlt/helpers/streamlit_app/blocks/table_hints.py b/dlt/helpers/streamlit_app/blocks/table_hints.py new file mode 100644 index 0000000000..aefab952e5 --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/table_hints.py @@ -0,0 +1,80 @@ +from typing import Any, Dict, List + +import dlt +import streamlit as st + +from dlt.common.schema.typing import TTableSchema +from dlt.common.utils import flatten_list_or_items +from dlt.helpers.streamlit_app.blocks.resource_state import resource_state_info +from dlt.helpers.streamlit_app.blocks.show_data import show_data_button + + +def list_table_hints(pipeline: dlt.Pipeline, tables: List[TTableSchema]) -> None: + current_schema = st.session_state["schema"] or pipeline.default_schema + if st.session_state["schema"]: + current_schema = st.session_state["schema"] + + for table in tables: + table_hints: List[str] = [] + if "parent" in table: + table_hints.append("parent: **%s**" % table["parent"]) + + if "resource" in table: + table_hints.append("resource: **%s**" % table["resource"]) + + if "write_disposition" in table: + table_hints.append("write disposition: **%s**" % table["write_disposition"]) + + columns = table["columns"] + primary_keys: List[str] = list( + flatten_list_or_items( + [ + col_name + for col_name in columns.keys() + if not col_name.startswith("_") + and columns[col_name].get("primary_key") is not None + ] + ) + ) + if primary_keys: + table_hints.append("primary key(s): **%s**" % ", ".join(primary_keys)) + + merge_keys = list( + flatten_list_or_items( + [ + col_name + for col_name in columns.keys() + if not col_name.startswith("_") + and not columns[col_name].get("merge_key") is None # noqa: E714 + ] + ) + ) + + if merge_keys: + table_hints.append("merge key(s): **%s**" % ", ".join(merge_keys)) + + st.subheader(f"Table: {table['name']}", divider=True) + st.markdown(" | ".join(table_hints)) + if "resource" in table: + resource_state_info( + pipeline, + current_schema.name, + table["resource"], + ) + + # table schema contains various hints (like clustering or partition options) + # that we do not want to show in basic view + def essentials_f(c: Any) -> Dict[str, Any]: + essentials: Dict[str, Any] = {} + for k, v in c.items(): + if k in ["name", "data_type", "nullable"]: + essentials[k] = v + + return { + "name": essentials["name"], + "data_type": essentials["data_type"], + "nullable": essentials["nullable"], + } + + st.table(map(essentials_f, table["columns"].values())) + show_data_button(pipeline, table["name"]) diff --git a/dlt/helpers/streamlit_app/index.py b/dlt/helpers/streamlit_app/index.py new file mode 100644 index 0000000000..31fb470640 --- /dev/null +++ b/dlt/helpers/streamlit_app/index.py @@ -0,0 +1,6 @@ +import streamlit as st + +from dlt.helpers.streamlit_app.utils import HERE + +if __name__ == "__main__": + st.switch_page(f"{HERE}/pages/dashboard.py") diff --git a/dlt/helpers/streamlit_app/pages/__init__.py b/dlt/helpers/streamlit_app/pages/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/helpers/streamlit_app/pages/dashboard.py b/dlt/helpers/streamlit_app/pages/dashboard.py new file mode 100644 index 0000000000..656dd6ecdf --- /dev/null +++ b/dlt/helpers/streamlit_app/pages/dashboard.py @@ -0,0 +1,53 @@ +import dlt +import streamlit as st + +from dlt.helpers.streamlit_app.blocks.query import maybe_run_query +from dlt.helpers.streamlit_app.blocks.table_hints import list_table_hints +from dlt.helpers.streamlit_app.blocks.menu import menu +from dlt.helpers.streamlit_app.utils import render_with_pipeline +from dlt.helpers.streamlit_app.widgets import schema_picker +from dlt.pipeline import Pipeline + + +def write_data_explorer_page( + pipeline: Pipeline, + schema_name: str = None, + example_query: str = "", + show_charts: bool = True, +) -> None: + """Writes Streamlit app page with a schema and live data preview. + + #### Args: + pipeline (Pipeline): Pipeline instance to use. + schema_name (str, optional): Name of the schema to display. If None, default schema is used. + example_query (str, optional): Example query to be displayed in the SQL Query box. + show_charts (bool, optional): Should automatically show charts for the queries from SQL Query box. Defaults to True. + + Raises: + MissingDependencyException: Raised when a particular python dependency is not installed + """ + + st.subheader("Schemas and tables", divider="rainbow") + schema_picker(pipeline) + tables = sorted( + st.session_state["schema"].data_tables(), + key=lambda table: table["name"], + ) + + list_table_hints(pipeline, tables) + maybe_run_query( + pipeline, + show_charts=show_charts, + example_query=example_query, + ) + + +def show(pipeline: dlt.Pipeline) -> None: + with st.sidebar: + menu(pipeline) + + write_data_explorer_page(pipeline) + + +if __name__ == "__main__": + render_with_pipeline(show) diff --git a/dlt/helpers/streamlit_app/pages/load_info.py b/dlt/helpers/streamlit_app/pages/load_info.py new file mode 100644 index 0000000000..ee13cf2531 --- /dev/null +++ b/dlt/helpers/streamlit_app/pages/load_info.py @@ -0,0 +1,130 @@ +import dlt +import streamlit as st + +from dlt.common.configuration.exceptions import ConfigFieldMissingException +from dlt.common.destination.reference import WithStateSync +from dlt.helpers.streamlit_app.blocks.load_info import last_load_info +from dlt.helpers.streamlit_app.blocks.menu import menu +from dlt.helpers.streamlit_app.widgets import stat +from dlt.helpers.streamlit_app.utils import ( + query_data, + query_data_live, + render_with_pipeline, +) +from dlt.pipeline import Pipeline +from dlt.pipeline.exceptions import CannotRestorePipelineException +from dlt.pipeline.state_sync import load_pipeline_state_from_destination + + +def write_load_status_page(pipeline: Pipeline) -> None: + """Display pipeline loading information.""" + + try: + loads_df = query_data_live( + pipeline, + f"SELECT load_id, inserted_at FROM {pipeline.default_schema.loads_table_name} WHERE" + " status = 0 ORDER BY inserted_at DESC LIMIT 101 ", + ) + + if loads_df is not None: + selected_load_id = st.selectbox("Select load id", loads_df) + schema = pipeline.default_schema + + st.markdown("**Number of loaded rows:**") + + # construct a union query + query_parts = [] + for table in schema.data_tables(): + if "parent" in table: + continue + table_name = table["name"] + query_parts.append( + f"SELECT '{table_name}' as table_name, COUNT(1) As rows_count FROM" + f" {table_name} WHERE _dlt_load_id = '{selected_load_id}'" + ) + query_parts.append("UNION ALL") + + query_parts.pop() + rows_counts_df = query_data(pipeline, "\n".join(query_parts)) + + st.markdown(f"Rows loaded in **{selected_load_id}**") + st.dataframe(rows_counts_df) + + st.markdown("**Last 100 loads**") + st.dataframe(loads_df) + + st.subheader("Schema updates", divider=True) + schemas_df = query_data_live( + pipeline, + "SELECT schema_name, inserted_at, version, version_hash FROM" + f" {pipeline.default_schema.version_table_name} ORDER BY inserted_at DESC LIMIT" + " 101 ", + ) + st.markdown("**100 recent schema updates**") + st.dataframe(schemas_df) + except CannotRestorePipelineException as restore_ex: + st.error("Seems like the pipeline does not exist. Did you run it at least once?") + st.exception(restore_ex) + + except ConfigFieldMissingException as cf_ex: + st.error( + "Pipeline credentials/configuration is missing. This most often happen when you run the" + " streamlit app from different folder than the `.dlt` with `toml` files resides." + ) + st.text(str(cf_ex)) + + except Exception as ex: + st.error("Pipeline info could not be prepared. Did you load the data at least once?") + st.exception(ex) + + +def show_state_versions(pipeline: dlt.Pipeline) -> None: + st.subheader("State info", divider=True) + remote_state = None + with pipeline.destination_client() as client: + if isinstance(client, WithStateSync): + remote_state = load_pipeline_state_from_destination(pipeline.pipeline_name, client) + + local_state = pipeline.state + + remote_state_version = "---" + if remote_state: + remote_state_version = str(remote_state["_state_version"]) + + col1, col2 = st.columns(2) + with col1: + stat( + label="Local version", + value=local_state["_state_version"], + display="block", + border_left_width=4, + ) + + with col2: + stat( + label="Remote version", + value=remote_state_version, + display="block", + border_left_width=4, + ) + + if remote_state_version != str(local_state["_state_version"]): + st.text("") + st.warning( + "Looks like that local state is not yet synchronized or synchronization is disabled", + icon="⚠ī¸", + ) + + +def show(pipeline: dlt.Pipeline) -> None: + st.subheader("Load info", divider="rainbow") + last_load_info(pipeline) + write_load_status_page(pipeline) + show_state_versions(pipeline) + + with st.sidebar: + menu(pipeline) + + +if __name__ == "__main__": + render_with_pipeline(show) diff --git a/dlt/helpers/streamlit_app/theme.py b/dlt/helpers/streamlit_app/theme.py new file mode 100644 index 0000000000..3b6b600a73 --- /dev/null +++ b/dlt/helpers/streamlit_app/theme.py @@ -0,0 +1,29 @@ +import streamlit as st + + +def dark_theme() -> None: + st.config.set_option("theme.base", "dark") + st.config.set_option("theme.primaryColor", "#191937") + + # Main background + st.config.set_option("theme.backgroundColor", "#4C4898") + + # Sidebar + st.config.set_option("theme.secondaryBackgroundColor", "#191937") + + # Text + st.config.set_option("theme.textColor", "#FEFEFA") + + +def light_theme() -> None: + st.config.set_option("theme.base", "light") + st.config.set_option("theme.primaryColor", "#333") + + # Main background + st.config.set_option("theme.backgroundColor", "#FEFEFE") + + # Sidebar + st.config.set_option("theme.secondaryBackgroundColor", "#ededed") + + # Text + st.config.set_option("theme.textColor", "#333") diff --git a/dlt/helpers/streamlit_app/utils.py b/dlt/helpers/streamlit_app/utils.py new file mode 100644 index 0000000000..6b2dab495c --- /dev/null +++ b/dlt/helpers/streamlit_app/utils.py @@ -0,0 +1,77 @@ +import argparse +import os + +from pathlib import Path +from typing import Optional, Callable, Tuple + +import dlt +import pandas as pd +import streamlit as st + +from dlt.cli import echo as fmt +from dlt.pipeline.exceptions import SqlClientNotAvailable + +HERE = Path(__file__).absolute().parent + + +def parse_args() -> Tuple[str, str]: + parser = argparse.ArgumentParser() + parser.add_argument("pipeline_name", nargs=1) + parser.add_argument( + "--pipelines-dir", + help="Pipelines working directory", + default=None, + ) + known_options, _ = parser.parse_known_args() + return known_options.pipeline_name[0], known_options.pipelines_dir + + +def render_with_pipeline(render_func: Callable[..., None]) -> None: + pipeline_name, pipelines_dir = parse_args() + if test_pipeline_name := os.getenv("DLT_TEST_PIPELINE_NAME"): + fmt.echo(f"RUNNING TEST PIPELINE: {test_pipeline_name}") + pipeline_name = test_pipeline_name + + st.session_state["pipeline_name"] = pipeline_name + # use pipelines dir from env var or try to resolve it using get_dlt_pipelines_dir + pipeline = dlt.attach(pipeline_name, pipelines_dir=pipelines_dir) + render_func(pipeline) + + +def query_using_cache( + pipeline: dlt.Pipeline, ttl: int +) -> Callable[..., Optional[pd.DataFrame]]: + @st.cache_data(ttl=ttl) + def do_query( # type: ignore[return] + query: str, + schema_name: str = None, + chunk_size: Optional[int] = None, + ) -> Optional[pd.DataFrame]: + try: + with pipeline.sql_client(schema_name) as client: + with client.execute_query(query) as curr: + return curr.df(chunk_size=chunk_size) + except SqlClientNotAvailable: + st.error("🚨 Cannot load data - SqlClient not available") + + return do_query # type: ignore + + +def query_data( + pipeline: dlt.Pipeline, + query: str, + schema_name: str = None, + chunk_size: Optional[int] = None, +) -> pd.DataFrame: + query_maker = query_using_cache(pipeline, ttl=600) + return query_maker(query, schema_name, chunk_size=chunk_size) + + +def query_data_live( + pipeline: dlt.Pipeline, + query: str, + schema_name: str = None, + chunk_size: Optional[int] = None, +) -> pd.DataFrame: + query_maker = query_using_cache(pipeline, ttl=5) + return query_maker(query, schema_name, chunk_size=chunk_size) diff --git a/dlt/helpers/streamlit_app/widgets/__init__.py b/dlt/helpers/streamlit_app/widgets/__init__.py new file mode 100644 index 0000000000..349d58166e --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/__init__.py @@ -0,0 +1,6 @@ +from dlt.helpers.streamlit_app.widgets.logo import logo +from dlt.helpers.streamlit_app.widgets.stats import stat +from dlt.helpers.streamlit_app.widgets.summary import pipeline_summary +from dlt.helpers.streamlit_app.widgets.tags import tag +from dlt.helpers.streamlit_app.widgets.schema import schema_picker +from dlt.helpers.streamlit_app.widgets.color_mode_selector import mode_selector diff --git a/dlt/helpers/streamlit_app/widgets/color_mode_selector.py b/dlt/helpers/streamlit_app/widgets/color_mode_selector.py new file mode 100644 index 0000000000..fba3231a34 --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/color_mode_selector.py @@ -0,0 +1,34 @@ +import streamlit as st + +from typing_extensions import Callable, Literal + +from dlt.helpers.streamlit_app.theme import dark_theme, light_theme + +ColorMode = Literal["light", "dark"] + + +def set_color_mode(mode: ColorMode) -> Callable[..., None]: + def set_mode() -> None: + st.session_state["color_mode"] = mode + if mode and mode == "dark": + dark_theme() + else: + light_theme() + + return set_mode + + +def mode_selector() -> None: + columns = st.columns(10) + light = columns[3] + dark = columns[5] + + # Set default theme to light if it wasn't set before + if not st.session_state.get("color_mode"): + st.session_state["color_mode"] = "light" + st.config.set_option("theme.base", "light") + + with light: + st.button("☀ī¸", on_click=set_color_mode("light")) + with dark: + st.button("🌚", on_click=set_color_mode("dark")) diff --git a/dlt/helpers/streamlit_app/widgets/logo.py b/dlt/helpers/streamlit_app/widgets/logo.py new file mode 100644 index 0000000000..41a5afff44 --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/logo.py @@ -0,0 +1,46 @@ +import streamlit as st + + +def logo() -> None: + logo_text = """ + + """ + styles = """ + + """ + + st.markdown(logo_text + styles, unsafe_allow_html=True) diff --git a/dlt/helpers/streamlit_app/widgets/schema.py b/dlt/helpers/streamlit_app/widgets/schema.py new file mode 100644 index 0000000000..f7883bc45e --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/schema.py @@ -0,0 +1,21 @@ +import dlt +import streamlit as st + + +def schema_picker(pipeline: dlt.Pipeline) -> None: + schema = None + num_schemas = len(pipeline.schema_names) + if num_schemas == 1: + schema_name = pipeline.schema_names[0] + schema = pipeline.schemas.get(schema_name) + elif num_schemas > 1: + text = "Select schema" + selected_schema_name = st.selectbox( + text, + sorted(pipeline.schema_names), + ) + schema = pipeline.schemas.get(selected_schema_name) + + if schema: + st.subheader(f"Schema: {schema.name}") + st.session_state["schema"] = schema diff --git a/dlt/helpers/streamlit_app/widgets/stats.py b/dlt/helpers/streamlit_app/widgets/stats.py new file mode 100644 index 0000000000..d0fded508b --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/stats.py @@ -0,0 +1,58 @@ +from typing import Any, Optional +import streamlit as st + + +def stat( + label: str, + value: Any, + width: Optional[str] = "100%", + display: Optional[str] = "inline-block", + background_color: Optional[str] = "#0e1111", + border_radius: Optional[int] = 4, + border_color: Optional[str] = "#272736", + border_left_color: Optional[str] = "#007b05", + border_left_width: Optional[int] = 0, +) -> None: + stat_html = f""" +
+

{label}

+

{value}

+
+ """ + mode = st.session_state.get("color_mode", "dark") + if mode == "light": + background_color = "#FEFEFE" + border_left_color = "#333333" + + styles = """ + .stat { + display: %s; + width: %s; + border-radius: %dpx; + border: 1px solid %s; + background-color: %s; + padding: 2%% 2%% 1%% 5%%; + margin-bottom: 2%%; + } + .stat-label { + font-size: 14px; + margin-bottom: 5px; + } + .stat-value { + font-size: 32px; + margin-bottom: 0; + } + %s + """ % (display, width, border_radius, border_color, background_color, "") + + if border_left_width > 1: + styles += """ + .stat { + border-left: %dpx solid %s !important; + } + """ % (border_left_width, border_left_color) + + st.markdown( + stat_html + f"", + unsafe_allow_html=True, + ) diff --git a/dlt/helpers/streamlit_app/widgets/summary.py b/dlt/helpers/streamlit_app/widgets/summary.py new file mode 100644 index 0000000000..afbefbe608 --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/summary.py @@ -0,0 +1,21 @@ +import dlt +import streamlit as st +from dlt.pipeline.exceptions import SqlClientNotAvailable + + +def pipeline_summary(pipeline: dlt.Pipeline) -> None: + try: + credentials = pipeline.sql_client().credentials + except SqlClientNotAvailable: + credentials = "---" + st.error("🚨 Cannot load data - SqlClient not available") + + schema_names = ", ".join(sorted(pipeline.schema_names)) + st.subheader("Pipeline info", divider=True) + st.markdown(f""" + * pipeline name: **{pipeline.pipeline_name}** + * destination: **{str(credentials)}** in **{pipeline.destination.destination_description}** + * dataset name: **{pipeline.dataset_name}** + * default schema name: **{pipeline.default_schema_name}** + * all schema names: **{schema_names}** + """) diff --git a/dlt/helpers/streamlit_app/widgets/tags.py b/dlt/helpers/streamlit_app/widgets/tags.py new file mode 100644 index 0000000000..a591e50efe --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/tags.py @@ -0,0 +1,41 @@ +from typing import Optional, Literal + +import streamlit as st + +TagType = Literal["info", "success", "warning", "error", "mute"] + + +def tag( + tag_name: str, + label: Optional[str] = None, + border_radius: Optional[int] = 4, + bold: Optional[bool] = False, + tag_type: Optional[TagType] = "mute", +) -> None: + tag_html = f""" + {str(label)+": " if label else ""}{tag_name} + """ + kinds = { + "mute": {"text_color": "#495057", "bg_color": "#e9ecef"}, + "info": {"text_color": "#1864ab", "bg_color": "#4dabf7"}, + "success": {"text_color": "#2b8a3e", "bg_color": "#8ce99a"}, + "warning": {"text_color": "#d9480f", "bg_color": "#ffa94d"}, + "error": {"text_color": "#c92a2a", "bg_color": "#ffe3e3"}, + } + kind = kinds[tag_type] + bg_color = kind["bg_color"] + text_color = kind["text_color"] + + styles = """ + + """ % (border_radius, bg_color, text_color, "600" if bold else "normal") + + st.markdown(tag_html + styles, unsafe_allow_html=True) diff --git a/tests/helpers/streamlit_tests/test_streamlit_show_resources.py b/tests/helpers/streamlit_tests/test_streamlit_show_resources.py index a26e9b774d..dd807260fe 100644 --- a/tests/helpers/streamlit_tests/test_streamlit_show_resources.py +++ b/tests/helpers/streamlit_tests/test_streamlit_show_resources.py @@ -5,9 +5,23 @@ dlt pipeline test_resources_pipeline show """ +import os +import sys +from pathlib import Path + +import pytest import dlt +from streamlit.testing.v1 import AppTest # type: ignore + +from dlt.helpers.streamlit_app.utils import render_with_pipeline +from dlt.pipeline.exceptions import CannotRestorePipelineException + +here = Path(__file__).parent +dlt_root = here.parent.parent.parent.absolute() +streamlit_app_path = dlt_root / "dlt/helpers/streamlit_app" + @dlt.source def source1(nr): @@ -25,13 +39,25 @@ def get_resource(nr): yield resource -@dlt.source +@dlt.source() def source2(nr): def get_resource2(nr): for i in range(nr): yield {"id": i, "column_2": f"xyz_{i}"} - def get_resource3(nr): + @dlt.resource( + name="Three", + write_disposition="merge", + primary_key=["column_3", "column_4"], + merge_key=["column_3"], + ) + def get_resource3( + nr, + id_inc: dlt.sources.incremental[int] = dlt.sources.incremental( + "id", + initial_value=0, + ), + ): for i in range(nr): yield {"id": i, "column_3": f"pqr_{i}", "column_4": f"pqrr_{i}"} @@ -42,28 +68,97 @@ def get_resource3(nr): primary_key="column_2", merge_key=["column_2"], ) - yield dlt.resource( - get_resource3(nr), - name="Three", - write_disposition="merge", - primary_key=["column_3", "column_4"], - merge_key=["column_3"], - ) + yield get_resource3(nr) def test_multiple_resources_pipeline(): pipeline = dlt.pipeline( - pipeline_name="test_resources_pipeline", destination="duckdb", dataset_name="rows_data2" + pipeline_name="test_resources_pipeline", + destination="duckdb", + dataset_name="rows_data2", ) load_info = pipeline.run([source1(10), source2(20)]) source1_schema = load_info.pipeline.schemas.get("source1") - assert load_info.pipeline.schema_names == ["source2", "source1"] # type: ignore[attr-defined] + assert set(load_info.pipeline.schema_names) == set(["source2", "source1"]) # type: ignore[attr-defined] assert source1_schema.data_tables()[0]["name"] == "one" assert source1_schema.data_tables()[0]["columns"]["column_1"].get("primary_key") is True assert source1_schema.data_tables()[0]["columns"]["column_1"].get("merge_key") is True assert source1_schema.data_tables()[0]["write_disposition"] == "merge" + os.environ["DLT_TEST_PIPELINE_NAME"] = "test_resources_pipeline" + streamlit_app = AppTest.from_file(str(streamlit_app_path / "index.py"), default_timeout=5) + streamlit_app.run() + assert not streamlit_app.exception + + # Check color mode switching updates session stats + streamlit_app.sidebar.button[0].click().run() + assert not streamlit_app.exception + assert streamlit_app.session_state["color_mode"] == "light" + + streamlit_app.sidebar.button[1].click().run() + assert not streamlit_app.exception + assert streamlit_app.session_state["color_mode"] == "dark" + + # Check page links in sidebar + assert "Explore data" in streamlit_app.sidebar[2].label + assert "Load info" in streamlit_app.sidebar[3].label + + # Check that at leas 4 content sections rendered + assert len(streamlit_app.subheader) > 4 + + # Check Explore data page + assert streamlit_app.subheader[0].value == "Schemas and tables" + assert streamlit_app.subheader[1].value == "Schema: source1" + assert streamlit_app.subheader[2].value == "Table: one" + assert streamlit_app.subheader[3].value == "Run your query" + assert streamlit_app.subheader[4].value == "Pipeline info" + + +def test_multiple_resources_pipeline_with_dummy_destination(): + pipeline = dlt.pipeline( + pipeline_name="test_resources_pipeline_dummy_destination", + destination="dummy", + dataset_name="rows_data2", + ) + pipeline.run([source1(10), source2(20)]) + + os.environ["DLT_TEST_PIPELINE_NAME"] = "test_resources_pipeline_dummy_destination" + streamlit_app = AppTest.from_file( + str(streamlit_app_path / "index.py"), + # bigger timeout because dlt might be slow at + # loading stage for dummy destination and timeout + default_timeout=8, + ) + streamlit_app.run() + + assert not streamlit_app.exception + + # We should have at least 2 errors one on the sidebar + # and the other two errors in the page for missing sql client + assert streamlit_app.error.len >= 2 + + +def test_render_with_pipeline_with_different_pipeline_dirs(): + pipeline = dlt.pipeline( + pipeline_name="test_resources_pipeline_dummy_destination", + destination="dummy", + ) + pipeline.run([{"n": 1}, {"n": 2}], table_name="numbers") + os.environ["DLT_TEST_PIPELINE_NAME"] = "test_resources_pipeline_dummy_destination" + base_args = ["dlt-show", "pipeline_name", "--pipelines-dir"] + + def dummy_render(pipeline: dlt.Pipeline) -> None: + pass + + old_args = sys.argv[:] + with pytest.raises(CannotRestorePipelineException): + sys.argv = [*base_args, "/run/dlt"] + render_with_pipeline(dummy_render) + + with pytest.raises(CannotRestorePipelineException): + sys.argv = [*base_args, "/tmp/dlt"] + render_with_pipeline(dummy_render) - # The rest should be inspected using the streamlit tool. + sys.argv = old_args