diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 67b172db5e..317124f8c8 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -21,8 +21,7 @@ jobs: needs: get_docs_changes if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' strategy: - fail-fast: false - max-parallel: 14 + fail-fast: true matrix: os: - ubuntu-latest diff --git a/dlt/cli/_dlt.py b/dlt/cli/_dlt.py index 9894227046..2332c0286c 100644 --- a/dlt/cli/_dlt.py +++ b/dlt/cli/_dlt.py @@ -443,6 +443,12 @@ def main() -> int: pipe_cmd.add_argument( "--list-pipelines", "-l", default=False, action="store_true", help="List local pipelines" ) + pipe_cmd.add_argument( + "--hot-reload", + default=False, + action="store_true", + help="Reload streamlit app (for core development)", + ) pipe_cmd.add_argument("pipeline_name", nargs="?", help="Pipeline name") pipe_cmd.add_argument("--pipelines-dir", help="Pipelines working directory", default=None) pipe_cmd.add_argument( diff --git a/dlt/cli/pipeline_command.py b/dlt/cli/pipeline_command.py index 9981fa8493..0eb73ad7a8 100644 --- a/dlt/cli/pipeline_command.py +++ b/dlt/cli/pipeline_command.py @@ -1,5 +1,5 @@ import yaml -from typing import Any, Sequence, Tuple +from typing import Any, Optional, Sequence, Tuple import dlt from dlt.cli.exceptions import CliCommandException @@ -15,6 +15,7 @@ from dlt.cli import echo as fmt + DLT_PIPELINE_COMMAND_DOCS_URL = "https://dlthub.com/docs/reference/command-line-interface" @@ -25,6 +26,7 @@ def pipeline_command( verbosity: int, dataset_name: str = None, destination: TDestinationReferenceArg = None, + hot_reload: Optional[bool] = False, **command_kwargs: Any, ) -> None: if operation == "list": @@ -48,7 +50,8 @@ def pipeline_command( raise fmt.warning(str(e)) if not fmt.confirm( - "Do you want to attempt to restore the pipeline state from destination?", default=False + "Do you want to attempt to restore the pipeline state from destination?", + default=False, ): return destination = destination or fmt.text_input( @@ -58,7 +61,10 @@ def pipeline_command( f"Enter dataset name for pipeline {fmt.bold(pipeline_name)}" ) p = dlt.pipeline( - pipeline_name, pipelines_dir, destination=destination, dataset_name=dataset_name + pipeline_name, + pipelines_dir, + destination=destination, + dataset_name=dataset_name, ) p.sync_destination() if p.first_run: @@ -101,13 +107,29 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]: if operation == "show": from dlt.common.runtime import signals - from dlt.helpers import streamlit_helper + from dlt.helpers.streamlit_app import index with signals.delayed_signals(): + streamlit_cmd = [ + "streamlit", + "run", + index.__file__, + "--client.showSidebarNavigation", + "false", + ] + + if hot_reload: + streamlit_cmd.append("--server.runOnSave") + streamlit_cmd.append("true") + + streamlit_cmd.append("--") + streamlit_cmd.append(pipeline_name) + if pipelines_dir: + streamlit_cmd.append("--pipelines-dir") + streamlit_cmd.append(pipelines_dir) + venv = Venv.restore_current() - for line in iter_stdout( - venv, "streamlit", "run", streamlit_helper.__file__, pipeline_name - ): + for line in iter_stdout(venv, *streamlit_cmd): fmt.echo(line) if operation == "info": @@ -255,7 +277,12 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]: tables = remove_defaults({"tables": package_info.schema_update}) # type: ignore fmt.echo(fmt.bold("Schema update:")) fmt.echo( - yaml.dump(tables, allow_unicode=True, default_flow_style=False, sort_keys=False) + yaml.dump( + tables, + allow_unicode=True, + default_flow_style=False, + sort_keys=False, + ) ) if operation == "schema": @@ -288,20 +315,33 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]: fmt.echo( "About to drop the following data in dataset %s in destination %s:" - % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.destination_name)) + % ( + fmt.bold(drop.info["dataset_name"]), + fmt.bold(p.destination.destination_name), + ) ) fmt.echo("%s: %s" % (fmt.style("Selected schema", fg="green"), drop.info["schema_name"])) fmt.echo( - "%s: %s" % (fmt.style("Selected resource(s)", fg="green"), drop.info["resource_names"]) + "%s: %s" + % ( + fmt.style("Selected resource(s)", fg="green"), + drop.info["resource_names"], + ) ) fmt.echo("%s: %s" % (fmt.style("Table(s) to drop", fg="green"), drop.info["tables"])) fmt.echo( "%s: %s" - % (fmt.style("Resource(s) state to reset", fg="green"), drop.info["resource_states"]) + % ( + fmt.style("Resource(s) state to reset", fg="green"), + drop.info["resource_states"], + ) ) fmt.echo( "%s: %s" - % (fmt.style("Source state path(s) to reset", fg="green"), drop.info["state_paths"]) + % ( + fmt.style("Source state path(s) to reset", fg="green"), + drop.info["state_paths"], + ) ) # for k, v in drop.info.items(): # fmt.echo("%s: %s" % (fmt.style(k, fg="green"), v)) diff --git a/dlt/helpers/streamlit_app/__init__.py b/dlt/helpers/streamlit_app/__init__.py new file mode 100644 index 0000000000..b304195a5a --- /dev/null +++ b/dlt/helpers/streamlit_app/__init__.py @@ -0,0 +1,11 @@ +from dlt.common.exceptions import MissingDependencyException + +# FIXME: Remove this after implementing package installer +try: + import streamlit +except ModuleNotFoundError: + raise MissingDependencyException( + "DLT Streamlit Helpers", + ["streamlit"], + "DLT Helpers for Streamlit should be run within a streamlit app.", + ) diff --git a/dlt/helpers/streamlit_app/blocks/__init__.py b/dlt/helpers/streamlit_app/blocks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/helpers/streamlit_app/blocks/load_info.py b/dlt/helpers/streamlit_app/blocks/load_info.py new file mode 100644 index 0000000000..134b5ad5a4 --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/load_info.py @@ -0,0 +1,40 @@ +import dlt +import humanize +import streamlit as st + +from dlt.common import pendulum +from dlt.helpers.streamlit_app.utils import query_data_live +from dlt.helpers.streamlit_app.widgets import stat + + +def last_load_info(pipeline: dlt.Pipeline) -> None: + loads_df = query_data_live( + pipeline, + f"SELECT load_id, inserted_at FROM {pipeline.default_schema.loads_table_name} WHERE" + " status = 0 ORDER BY inserted_at DESC LIMIT 101 ", + ) + + if loads_df is None: + st.error( + "Load info is not available", + icon="đ¨", + ) + else: + loads_no = loads_df.shape[0] + if loads_df.shape[0] > 0: + rel_time = ( + humanize.naturaldelta( + pendulum.now() - pendulum.from_timestamp(loads_df.iloc[0, 1].timestamp()) + ) + + " ago" + ) + last_load_id = loads_df.iloc[0, 0] + if loads_no > 100: + loads_no = "> " + str(loads_no) + else: + rel_time = "---" + last_load_id = "---" + + stat("Last load time", rel_time, border_left_width=4) + stat("Last load id", last_load_id) + stat("Total number of loads", loads_no) diff --git a/dlt/helpers/streamlit_app/blocks/menu.py b/dlt/helpers/streamlit_app/blocks/menu.py new file mode 100644 index 0000000000..b6d0b5f7aa --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/menu.py @@ -0,0 +1,14 @@ +import dlt +import streamlit as st + +from dlt.helpers.streamlit_app.utils import HERE +from dlt.helpers.streamlit_app.widgets import logo, mode_selector +from dlt.helpers.streamlit_app.widgets import pipeline_summary + + +def menu(pipeline: dlt.Pipeline) -> None: + mode_selector() + logo() + st.page_link(f"{HERE}/pages/dashboard.py", label="Explore data", icon="đšī¸") + st.page_link(f"{HERE}/pages/load_info.py", label="Load info", icon="đž") + pipeline_summary(pipeline) diff --git a/dlt/helpers/streamlit_app/blocks/query.py b/dlt/helpers/streamlit_app/blocks/query.py new file mode 100644 index 0000000000..a03e9a0cd9 --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/query.py @@ -0,0 +1,57 @@ +from typing import Optional +import dlt +import streamlit as st + +from dlt.common.exceptions import MissingDependencyException +from dlt.helpers.streamlit_app.utils import query_data + + +def maybe_run_query( + pipeline: dlt.Pipeline, + show_charts: bool = True, + example_query: Optional[str] = "", +) -> None: + st.subheader("Run your query") + sql_query = st.text_area("Enter your SQL query", value=example_query) + if st.button("Run Query"): + if sql_query: + try: + # run the query from the text area + df = query_data(pipeline, sql_query, chunk_size=2048) + if df is None: + st.text("No rows returned") + else: + rows_count = df.shape[0] + st.text(f"{rows_count} row(s) returned") + st.dataframe(df) + try: + # now if the dataset has supported shape try to display the bar or altair chart + if df.dtypes.shape[0] == 1 and show_charts: + # try barchart + st.bar_chart(df) + if df.dtypes.shape[0] == 2 and show_charts: + # try to import altair charts + try: + import altair as alt + except ModuleNotFoundError: + raise MissingDependencyException( + "DLT Streamlit Helpers", + ["altair"], + "DLT Helpers for Streamlit should be run within a streamlit" + " app.", + ) + + # try altair + bar_chart = ( + alt.Chart(df) + .mark_bar() + .encode( + x=f"{df.columns[1]}:Q", y=alt.Y(f"{df.columns[0]}:N", sort="-x") + ) + ) + st.altair_chart(bar_chart, use_container_width=True) + except Exception as ex: + st.error(f"Chart failed due to: {ex}") + except Exception as ex: + st.text("Exception when running query") + st.exception(ex) diff --git a/dlt/helpers/streamlit_app/blocks/resource_state.py b/dlt/helpers/streamlit_app/blocks/resource_state.py new file mode 100644 index 0000000000..8ea1256a1f --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/resource_state.py @@ -0,0 +1,29 @@ +import dlt +import streamlit as st +import yaml + +from dlt.common import json +from dlt.common.libs.pandas import pandas as pd +from dlt.common.pipeline import resource_state, TSourceState +from dlt.common.schema.utils import group_tables_by_resource +from dlt.helpers.streamlit_app.widgets.tags import tag + + +def resource_state_info( + pipeline: dlt.Pipeline, + schema_name: str, + resource_name: str, +) -> None: + sources_state = pipeline.state.get("sources") or {} + schema = sources_state.get(schema_name) + if not schema: + st.error(f"Schema with name: {schema_name} is not found") + return + + resource = schema["resources"].get(resource_name) + with st.expander("Resource state", expanded=(resource is None)): + if not resource: + st.info(f"{resource_name} is missing resource state") + else: + spec = yaml.safe_dump(resource) + st.code(spec, language="yaml") diff --git a/dlt/helpers/streamlit_app/blocks/show_data.py b/dlt/helpers/streamlit_app/blocks/show_data.py new file mode 100644 index 0000000000..7aaab084f3 --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/show_data.py @@ -0,0 +1,21 @@ +from typing import List + +import dlt +import streamlit as st + +from dlt.helpers.streamlit_app.utils import query_data + + +def show_data_button(pipeline: dlt.Pipeline, table_name: str) -> None: + if st.button("SHOW DATA", key=table_name): + df = query_data(pipeline, f"SELECT * FROM {table_name}", chunk_size=2048) + if df is None: + st.text("No rows returned") + else: + rows_count = df.shape[0] + if df.shape[0] < 2048: + st.text(f"All {rows_count} row(s)") + else: + st.text(f"Top {rows_count} row(s)") + + st.dataframe(df) diff --git a/dlt/helpers/streamlit_app/blocks/table_hints.py b/dlt/helpers/streamlit_app/blocks/table_hints.py new file mode 100644 index 0000000000..aefab952e5 --- /dev/null +++ b/dlt/helpers/streamlit_app/blocks/table_hints.py @@ -0,0 +1,80 @@ +from typing import Any, Dict, List + +import dlt +import streamlit as st + +from dlt.common.schema.typing import TTableSchema +from dlt.common.utils import flatten_list_or_items +from dlt.helpers.streamlit_app.blocks.resource_state import resource_state_info +from dlt.helpers.streamlit_app.blocks.show_data import show_data_button + + +def list_table_hints(pipeline: dlt.Pipeline, tables: List[TTableSchema]) -> None: + current_schema = st.session_state["schema"] or pipeline.default_schema + if st.session_state["schema"]: + current_schema = st.session_state["schema"] + + for table in tables: + table_hints: List[str] = [] + if "parent" in table: + table_hints.append("parent: **%s**" % table["parent"]) + + if "resource" in table: + table_hints.append("resource: **%s**" % table["resource"]) + + if "write_disposition" in table: + table_hints.append("write disposition: **%s**" % table["write_disposition"]) + + columns = table["columns"] + primary_keys: List[str] = list( + flatten_list_or_items( + [ + col_name + for col_name in columns.keys() + if not col_name.startswith("_") + and columns[col_name].get("primary_key") is not None + ] + ) + ) + if primary_keys: + table_hints.append("primary key(s): **%s**" % ", ".join(primary_keys)) + + merge_keys = list( + flatten_list_or_items( + [ + col_name + for col_name in columns.keys() + if not col_name.startswith("_") + and not columns[col_name].get("merge_key") is None # noqa: E714 + ] + ) + ) + + if merge_keys: + table_hints.append("merge key(s): **%s**" % ", ".join(merge_keys)) + + st.subheader(f"Table: {table['name']}", divider=True) + st.markdown(" | ".join(table_hints)) + if "resource" in table: + resource_state_info( + pipeline, + current_schema.name, + table["resource"], + ) + + # table schema contains various hints (like clustering or partition options) + # that we do not want to show in basic view + def essentials_f(c: Any) -> Dict[str, Any]: + essentials: Dict[str, Any] = {} + for k, v in c.items(): + if k in ["name", "data_type", "nullable"]: + essentials[k] = v + + return { + "name": essentials["name"], + "data_type": essentials["data_type"], + "nullable": essentials["nullable"], + } + + st.table(map(essentials_f, table["columns"].values())) + show_data_button(pipeline, table["name"]) diff --git a/dlt/helpers/streamlit_app/index.py b/dlt/helpers/streamlit_app/index.py new file mode 100644 index 0000000000..31fb470640 --- /dev/null +++ b/dlt/helpers/streamlit_app/index.py @@ -0,0 +1,6 @@ +import streamlit as st + +from dlt.helpers.streamlit_app.utils import HERE + +if __name__ == "__main__": + st.switch_page(f"{HERE}/pages/dashboard.py") diff --git a/dlt/helpers/streamlit_app/pages/__init__.py b/dlt/helpers/streamlit_app/pages/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/helpers/streamlit_app/pages/dashboard.py b/dlt/helpers/streamlit_app/pages/dashboard.py new file mode 100644 index 0000000000..656dd6ecdf --- /dev/null +++ b/dlt/helpers/streamlit_app/pages/dashboard.py @@ -0,0 +1,53 @@ +import dlt +import streamlit as st + +from dlt.helpers.streamlit_app.blocks.query import maybe_run_query +from dlt.helpers.streamlit_app.blocks.table_hints import list_table_hints +from dlt.helpers.streamlit_app.blocks.menu import menu +from dlt.helpers.streamlit_app.utils import render_with_pipeline +from dlt.helpers.streamlit_app.widgets import schema_picker +from dlt.pipeline import Pipeline + + +def write_data_explorer_page( + pipeline: Pipeline, + schema_name: str = None, + example_query: str = "", + show_charts: bool = True, +) -> None: + """Writes Streamlit app page with a schema and live data preview. + + #### Args: + pipeline (Pipeline): Pipeline instance to use. + schema_name (str, optional): Name of the schema to display. If None, default schema is used. + example_query (str, optional): Example query to be displayed in the SQL Query box. + show_charts (bool, optional): Should automatically show charts for the queries from SQL Query box. Defaults to True. + + Raises: + MissingDependencyException: Raised when a particular python dependency is not installed + """ + + st.subheader("Schemas and tables", divider="rainbow") + schema_picker(pipeline) + tables = sorted( + st.session_state["schema"].data_tables(), + key=lambda table: table["name"], + ) + + list_table_hints(pipeline, tables) + maybe_run_query( + pipeline, + show_charts=show_charts, + example_query=example_query, + ) + + +def show(pipeline: dlt.Pipeline) -> None: + with st.sidebar: + menu(pipeline) + + write_data_explorer_page(pipeline) + + +if __name__ == "__main__": + render_with_pipeline(show) diff --git a/dlt/helpers/streamlit_app/pages/load_info.py b/dlt/helpers/streamlit_app/pages/load_info.py new file mode 100644 index 0000000000..ee13cf2531 --- /dev/null +++ b/dlt/helpers/streamlit_app/pages/load_info.py @@ -0,0 +1,130 @@ +import dlt +import streamlit as st + +from dlt.common.configuration.exceptions import ConfigFieldMissingException +from dlt.common.destination.reference import WithStateSync +from dlt.helpers.streamlit_app.blocks.load_info import last_load_info +from dlt.helpers.streamlit_app.blocks.menu import menu +from dlt.helpers.streamlit_app.widgets import stat +from dlt.helpers.streamlit_app.utils import ( + query_data, + query_data_live, + render_with_pipeline, +) +from dlt.pipeline import Pipeline +from dlt.pipeline.exceptions import CannotRestorePipelineException +from dlt.pipeline.state_sync import load_pipeline_state_from_destination + + +def write_load_status_page(pipeline: Pipeline) -> None: + """Display pipeline loading information.""" + + try: + loads_df = query_data_live( + pipeline, + f"SELECT load_id, inserted_at FROM {pipeline.default_schema.loads_table_name} WHERE" + " status = 0 ORDER BY inserted_at DESC LIMIT 101 ", + ) + + if loads_df is not None: + selected_load_id = st.selectbox("Select load id", loads_df) + schema = pipeline.default_schema + + st.markdown("**Number of loaded rows:**") + + # construct a union query + query_parts = [] + for table in schema.data_tables(): + if "parent" in table: + continue + table_name = table["name"] + query_parts.append( + f"SELECT '{table_name}' as table_name, COUNT(1) As rows_count FROM" + f" {table_name} WHERE _dlt_load_id = '{selected_load_id}'" + ) + query_parts.append("UNION ALL") + + query_parts.pop() + rows_counts_df = query_data(pipeline, "\n".join(query_parts)) + + st.markdown(f"Rows loaded in **{selected_load_id}**") + st.dataframe(rows_counts_df) + + st.markdown("**Last 100 loads**") + st.dataframe(loads_df) + + st.subheader("Schema updates", divider=True) + schemas_df = query_data_live( + pipeline, + "SELECT schema_name, inserted_at, version, version_hash FROM" + f" {pipeline.default_schema.version_table_name} ORDER BY inserted_at DESC LIMIT" + " 101 ", + ) + st.markdown("**100 recent schema updates**") + st.dataframe(schemas_df) + except CannotRestorePipelineException as restore_ex: + st.error("Seems like the pipeline does not exist. Did you run it at least once?") + st.exception(restore_ex) + + except ConfigFieldMissingException as cf_ex: + st.error( + "Pipeline credentials/configuration is missing. This most often happen when you run the" + " streamlit app from different folder than the `.dlt` with `toml` files resides." + ) + st.text(str(cf_ex)) + + except Exception as ex: + st.error("Pipeline info could not be prepared. Did you load the data at least once?") + st.exception(ex) + + +def show_state_versions(pipeline: dlt.Pipeline) -> None: + st.subheader("State info", divider=True) + remote_state = None + with pipeline.destination_client() as client: + if isinstance(client, WithStateSync): + remote_state = load_pipeline_state_from_destination(pipeline.pipeline_name, client) + + local_state = pipeline.state + + remote_state_version = "---" + if remote_state: + remote_state_version = str(remote_state["_state_version"]) + + col1, col2 = st.columns(2) + with col1: + stat( + label="Local version", + value=local_state["_state_version"], + display="block", + border_left_width=4, + ) + + with col2: + stat( + label="Remote version", + value=remote_state_version, + display="block", + border_left_width=4, + ) + + if remote_state_version != str(local_state["_state_version"]): + st.text("") + st.warning( + "Looks like that local state is not yet synchronized or synchronization is disabled", + icon="â ī¸", + ) + + +def show(pipeline: dlt.Pipeline) -> None: + st.subheader("Load info", divider="rainbow") + last_load_info(pipeline) + write_load_status_page(pipeline) + show_state_versions(pipeline) + + with st.sidebar: + menu(pipeline) + + +if __name__ == "__main__": + render_with_pipeline(show) diff --git a/dlt/helpers/streamlit_app/theme.py b/dlt/helpers/streamlit_app/theme.py new file mode 100644 index 0000000000..3b6b600a73 --- /dev/null +++ b/dlt/helpers/streamlit_app/theme.py @@ -0,0 +1,29 @@ +import streamlit as st + + +def dark_theme() -> None: + st.config.set_option("theme.base", "dark") + st.config.set_option("theme.primaryColor", "#191937") + + # Main background + st.config.set_option("theme.backgroundColor", "#4C4898") + + # Sidebar + st.config.set_option("theme.secondaryBackgroundColor", "#191937") + + # Text + st.config.set_option("theme.textColor", "#FEFEFA") + + +def light_theme() -> None: + st.config.set_option("theme.base", "light") + st.config.set_option("theme.primaryColor", "#333") + + # Main background + st.config.set_option("theme.backgroundColor", "#FEFEFE") + + # Sidebar + st.config.set_option("theme.secondaryBackgroundColor", "#ededed") + + # Text + st.config.set_option("theme.textColor", "#333") diff --git a/dlt/helpers/streamlit_app/utils.py b/dlt/helpers/streamlit_app/utils.py new file mode 100644 index 0000000000..6b2dab495c --- /dev/null +++ b/dlt/helpers/streamlit_app/utils.py @@ -0,0 +1,77 @@ +import argparse +import os + +from pathlib import Path +from typing import Optional, Callable, Tuple + +import dlt +import pandas as pd +import streamlit as st + +from dlt.cli import echo as fmt +from dlt.pipeline.exceptions import SqlClientNotAvailable + +HERE = Path(__file__).absolute().parent + + +def parse_args() -> Tuple[str, str]: + parser = argparse.ArgumentParser() + parser.add_argument("pipeline_name", nargs=1) + parser.add_argument( + "--pipelines-dir", + help="Pipelines working directory", + default=None, + ) + known_options, _ = parser.parse_known_args() + return known_options.pipeline_name[0], known_options.pipelines_dir + + +def render_with_pipeline(render_func: Callable[..., None]) -> None: + pipeline_name, pipelines_dir = parse_args() + if test_pipeline_name := os.getenv("DLT_TEST_PIPELINE_NAME"): + fmt.echo(f"RUNNING TEST PIPELINE: {test_pipeline_name}") + pipeline_name = test_pipeline_name + + st.session_state["pipeline_name"] = pipeline_name + # use pipelines dir from env var or try to resolve it using get_dlt_pipelines_dir + pipeline = dlt.attach(pipeline_name, pipelines_dir=pipelines_dir) + render_func(pipeline) + + +def query_using_cache( + pipeline: dlt.Pipeline, ttl: int +) -> Callable[..., Optional[pd.DataFrame]]: + @st.cache_data(ttl=ttl) + def do_query( # type: ignore[return] + query: str, + schema_name: str = None, + chunk_size: Optional[int] = None, + ) -> Optional[pd.DataFrame]: + try: + with pipeline.sql_client(schema_name) as client: + with client.execute_query(query) as curr: + return curr.df(chunk_size=chunk_size) + except SqlClientNotAvailable: + st.error("đ¨ Cannot load data - SqlClient not available") + + return do_query # type: ignore + + +def query_data( + pipeline: dlt.Pipeline, + query: str, + schema_name: str = None, + chunk_size: Optional[int] = None, +) -> pd.DataFrame: + query_maker = query_using_cache(pipeline, ttl=600) + return query_maker(query, schema_name, chunk_size=chunk_size) + + +def query_data_live( + pipeline: dlt.Pipeline, + query: str, + schema_name: str = None, + chunk_size: Optional[int] = None, +) -> pd.DataFrame: + query_maker = query_using_cache(pipeline, ttl=5) + return query_maker(query, schema_name, chunk_size=chunk_size) diff --git a/dlt/helpers/streamlit_app/widgets/__init__.py b/dlt/helpers/streamlit_app/widgets/__init__.py new file mode 100644 index 0000000000..349d58166e --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/__init__.py @@ -0,0 +1,6 @@ +from dlt.helpers.streamlit_app.widgets.logo import logo +from dlt.helpers.streamlit_app.widgets.stats import stat +from dlt.helpers.streamlit_app.widgets.summary import pipeline_summary +from dlt.helpers.streamlit_app.widgets.tags import tag +from dlt.helpers.streamlit_app.widgets.schema import schema_picker +from dlt.helpers.streamlit_app.widgets.color_mode_selector import mode_selector diff --git a/dlt/helpers/streamlit_app/widgets/color_mode_selector.py b/dlt/helpers/streamlit_app/widgets/color_mode_selector.py new file mode 100644 index 0000000000..fba3231a34 --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/color_mode_selector.py @@ -0,0 +1,34 @@ +import streamlit as st + +from typing_extensions import Callable, Literal + +from dlt.helpers.streamlit_app.theme import dark_theme, light_theme + +ColorMode = Literal["light", "dark"] + + +def set_color_mode(mode: ColorMode) -> Callable[..., None]: + def set_mode() -> None: + st.session_state["color_mode"] = mode + if mode and mode == "dark": + dark_theme() + else: + light_theme() + + return set_mode + + +def mode_selector() -> None: + columns = st.columns(10) + light = columns[3] + dark = columns[5] + + # Set default theme to light if it wasn't set before + if not st.session_state.get("color_mode"): + st.session_state["color_mode"] = "light" + st.config.set_option("theme.base", "light") + + with light: + st.button("âī¸", on_click=set_color_mode("light")) + with dark: + st.button("đ", on_click=set_color_mode("dark")) diff --git a/dlt/helpers/streamlit_app/widgets/logo.py b/dlt/helpers/streamlit_app/widgets/logo.py new file mode 100644 index 0000000000..41a5afff44 --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/logo.py @@ -0,0 +1,46 @@ +import streamlit as st + + +def logo() -> None: + logo_text = """ +
{label}
+{value}
+