Skip to content

Commit

Permalink
Streamlit improvements (#1060)
Browse files Browse the repository at this point in the history
* Streamlit improvements

* Refactor menu and create custom widgets

* Add tag component

* Add destination name to sidebar

* Adjust headers

* Set color scheme

* Fix linting issues

* Move import dlt to the top

* Use defaul tag type

* Use lower contrast shade of white

* Use smaller heading element

* Handle dummy destination errors and implement custom light and dark modes

* Cleanup streamlit app code

* Fix linting issues

* Fix linting issues

* Extract components from streamlit explorer code

* Cleanup redundant data display

* Add tag bold text option

* Cast label to string

* Rework document sections and display resource incremental if given

* Do not display primary and merge keys if they are not specified

* Integrate streamlit app tests

* Fix mypy issue

* Add general rendering test checks for streamlit

* Set default color mode as light

* Display resource state in expandable area

* More rendering checks

* Cleanup tests

* Add test case for streamlit app with dummy dlt destination

* Ignore B015 error

* Fix linting errors

* Add hot reload support for streamlit via DLT_STREAMLIT_HOT_RELOAD variable

* Move non-essential info from sidebar to load info page

* Expand pipeline summary block by default

* Sort table by column names

* Fix mypy errors

* Pass pipelines dir and use bool argument for streamlit hot reload

* Keep resource state expanded if it is missing the state

* Use DLT_PIPELINES_DIR in load info as well

* Remove unused import

* Do not sort table column

* Extract pipeline attaching logic

* Use pipeline name from cli arguments for load_info page

* Move pipeline_state_info into blocks

* Remove comment

* Move menu into blocks

* Extract pipeline loading logic

* Show simple unordered list with pipeline summary

* Cleanup redundant code

* Adjust tests

* Remove unused code

* Move dashboard into pages

* Refactor querying logic and stop using deprecated experimental caching for streamlit

* Fix mypy errors

* Use get_dlt_data_dir to resolve pipelines_dir

* Add more tests and checks

* Pass DLT_PIPELINES_DIR instead of modifying DLT_DATA_DIR

* Restore os environment after streamlit app exits

* Remove max-parallel for linting

* Allow linting to fail fast and fix linting errors

* Fix mypy errors

* Format code

* Show message when pipelines dir is passed

* Show message if pipelines_dir is passed

* Copy streamlit package check to streamlit_app module __init__

* Adjust mypy ignore

* Fix mypy issues

* moves load info around

* Pass pipeline params after terminator

* Remove info message when pipelines dir is passed

* Restor system args after test

* Fix linting error

* Adjust streamlit command arguments passing

* Add comments

* Manually enforce column order

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
sultaniman and rudolfix authored Mar 21, 2024
1 parent 1f2b4ce commit 3a815bc
Show file tree
Hide file tree
Showing 25 changed files with 940 additions and 26 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ jobs:
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
max-parallel: 14
fail-fast: true
matrix:
os:
- ubuntu-latest
Expand Down
6 changes: 6 additions & 0 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,12 @@ def main() -> int:
pipe_cmd.add_argument(
"--list-pipelines", "-l", default=False, action="store_true", help="List local pipelines"
)
pipe_cmd.add_argument(
"--hot-reload",
default=False,
action="store_true",
help="Reload streamlit app (for core development)",
)
pipe_cmd.add_argument("pipeline_name", nargs="?", help="Pipeline name")
pipe_cmd.add_argument("--pipelines-dir", help="Pipelines working directory", default=None)
pipe_cmd.add_argument(
Expand Down
64 changes: 52 additions & 12 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import yaml
from typing import Any, Sequence, Tuple
from typing import Any, Optional, Sequence, Tuple
import dlt
from dlt.cli.exceptions import CliCommandException

Expand All @@ -15,6 +15,7 @@

from dlt.cli import echo as fmt


DLT_PIPELINE_COMMAND_DOCS_URL = "https://dlthub.com/docs/reference/command-line-interface"


Expand All @@ -25,6 +26,7 @@ def pipeline_command(
verbosity: int,
dataset_name: str = None,
destination: TDestinationReferenceArg = None,
hot_reload: Optional[bool] = False,
**command_kwargs: Any,
) -> None:
if operation == "list":
Expand All @@ -48,7 +50,8 @@ def pipeline_command(
raise
fmt.warning(str(e))
if not fmt.confirm(
"Do you want to attempt to restore the pipeline state from destination?", default=False
"Do you want to attempt to restore the pipeline state from destination?",
default=False,
):
return
destination = destination or fmt.text_input(
Expand All @@ -58,7 +61,10 @@ def pipeline_command(
f"Enter dataset name for pipeline {fmt.bold(pipeline_name)}"
)
p = dlt.pipeline(
pipeline_name, pipelines_dir, destination=destination, dataset_name=dataset_name
pipeline_name,
pipelines_dir,
destination=destination,
dataset_name=dataset_name,
)
p.sync_destination()
if p.first_run:
Expand Down Expand Up @@ -101,13 +107,29 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:

if operation == "show":
from dlt.common.runtime import signals
from dlt.helpers import streamlit_helper
from dlt.helpers.streamlit_app import index

with signals.delayed_signals():
streamlit_cmd = [
"streamlit",
"run",
index.__file__,
"--client.showSidebarNavigation",
"false",
]

if hot_reload:
streamlit_cmd.append("--server.runOnSave")
streamlit_cmd.append("true")

streamlit_cmd.append("--")
streamlit_cmd.append(pipeline_name)
if pipelines_dir:
streamlit_cmd.append("--pipelines-dir")
streamlit_cmd.append(pipelines_dir)

venv = Venv.restore_current()
for line in iter_stdout(
venv, "streamlit", "run", streamlit_helper.__file__, pipeline_name
):
for line in iter_stdout(venv, *streamlit_cmd):
fmt.echo(line)

if operation == "info":
Expand Down Expand Up @@ -255,7 +277,12 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
tables = remove_defaults({"tables": package_info.schema_update}) # type: ignore
fmt.echo(fmt.bold("Schema update:"))
fmt.echo(
yaml.dump(tables, allow_unicode=True, default_flow_style=False, sort_keys=False)
yaml.dump(
tables,
allow_unicode=True,
default_flow_style=False,
sort_keys=False,
)
)

if operation == "schema":
Expand Down Expand Up @@ -288,20 +315,33 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:

fmt.echo(
"About to drop the following data in dataset %s in destination %s:"
% (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.destination_name))
% (
fmt.bold(drop.info["dataset_name"]),
fmt.bold(p.destination.destination_name),
)
)
fmt.echo("%s: %s" % (fmt.style("Selected schema", fg="green"), drop.info["schema_name"]))
fmt.echo(
"%s: %s" % (fmt.style("Selected resource(s)", fg="green"), drop.info["resource_names"])
"%s: %s"
% (
fmt.style("Selected resource(s)", fg="green"),
drop.info["resource_names"],
)
)
fmt.echo("%s: %s" % (fmt.style("Table(s) to drop", fg="green"), drop.info["tables"]))
fmt.echo(
"%s: %s"
% (fmt.style("Resource(s) state to reset", fg="green"), drop.info["resource_states"])
% (
fmt.style("Resource(s) state to reset", fg="green"),
drop.info["resource_states"],
)
)
fmt.echo(
"%s: %s"
% (fmt.style("Source state path(s) to reset", fg="green"), drop.info["state_paths"])
% (
fmt.style("Source state path(s) to reset", fg="green"),
drop.info["state_paths"],
)
)
# for k, v in drop.info.items():
# fmt.echo("%s: %s" % (fmt.style(k, fg="green"), v))
Expand Down
11 changes: 11 additions & 0 deletions dlt/helpers/streamlit_app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dlt.common.exceptions import MissingDependencyException

# FIXME: Remove this after implementing package installer
try:
import streamlit
except ModuleNotFoundError:
raise MissingDependencyException(
"DLT Streamlit Helpers",
["streamlit"],
"DLT Helpers for Streamlit should be run within a streamlit app.",
)
Empty file.
40 changes: 40 additions & 0 deletions dlt/helpers/streamlit_app/blocks/load_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import dlt
import humanize
import streamlit as st

from dlt.common import pendulum
from dlt.helpers.streamlit_app.utils import query_data_live
from dlt.helpers.streamlit_app.widgets import stat


def last_load_info(pipeline: dlt.Pipeline) -> None:
loads_df = query_data_live(
pipeline,
f"SELECT load_id, inserted_at FROM {pipeline.default_schema.loads_table_name} WHERE"
" status = 0 ORDER BY inserted_at DESC LIMIT 101 ",
)

if loads_df is None:
st.error(
"Load info is not available",
icon="🚨",
)
else:
loads_no = loads_df.shape[0]
if loads_df.shape[0] > 0:
rel_time = (
humanize.naturaldelta(
pendulum.now() - pendulum.from_timestamp(loads_df.iloc[0, 1].timestamp())
)
+ " ago"
)
last_load_id = loads_df.iloc[0, 0]
if loads_no > 100:
loads_no = "> " + str(loads_no)
else:
rel_time = "---"
last_load_id = "---"

stat("Last load time", rel_time, border_left_width=4)
stat("Last load id", last_load_id)
stat("Total number of loads", loads_no)
14 changes: 14 additions & 0 deletions dlt/helpers/streamlit_app/blocks/menu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import dlt
import streamlit as st

from dlt.helpers.streamlit_app.utils import HERE
from dlt.helpers.streamlit_app.widgets import logo, mode_selector
from dlt.helpers.streamlit_app.widgets import pipeline_summary


def menu(pipeline: dlt.Pipeline) -> None:
mode_selector()
logo()
st.page_link(f"{HERE}/pages/dashboard.py", label="Explore data", icon="🕹️")
st.page_link(f"{HERE}/pages/load_info.py", label="Load info", icon="💾")
pipeline_summary(pipeline)
57 changes: 57 additions & 0 deletions dlt/helpers/streamlit_app/blocks/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Optional
import dlt
import streamlit as st

from dlt.common.exceptions import MissingDependencyException
from dlt.helpers.streamlit_app.utils import query_data


def maybe_run_query(
pipeline: dlt.Pipeline,
show_charts: bool = True,
example_query: Optional[str] = "",
) -> None:
st.subheader("Run your query")
sql_query = st.text_area("Enter your SQL query", value=example_query)
if st.button("Run Query"):
if sql_query:
try:
# run the query from the text area
df = query_data(pipeline, sql_query, chunk_size=2048)
if df is None:
st.text("No rows returned")
else:
rows_count = df.shape[0]
st.text(f"{rows_count} row(s) returned")
st.dataframe(df)
try:
# now if the dataset has supported shape try to display the bar or altair chart
if df.dtypes.shape[0] == 1 and show_charts:
# try barchart
st.bar_chart(df)
if df.dtypes.shape[0] == 2 and show_charts:
# try to import altair charts
try:
import altair as alt
except ModuleNotFoundError:
raise MissingDependencyException(
"DLT Streamlit Helpers",
["altair"],
"DLT Helpers for Streamlit should be run within a streamlit"
" app.",
)

# try altair
bar_chart = (
alt.Chart(df)
.mark_bar()
.encode(
x=f"{df.columns[1]}:Q", y=alt.Y(f"{df.columns[0]}:N", sort="-x")
)
)
st.altair_chart(bar_chart, use_container_width=True)
except Exception as ex:
st.error(f"Chart failed due to: {ex}")
except Exception as ex:
st.text("Exception when running query")
st.exception(ex)
29 changes: 29 additions & 0 deletions dlt/helpers/streamlit_app/blocks/resource_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import dlt
import streamlit as st
import yaml

from dlt.common import json
from dlt.common.libs.pandas import pandas as pd
from dlt.common.pipeline import resource_state, TSourceState
from dlt.common.schema.utils import group_tables_by_resource
from dlt.helpers.streamlit_app.widgets.tags import tag


def resource_state_info(
pipeline: dlt.Pipeline,
schema_name: str,
resource_name: str,
) -> None:
sources_state = pipeline.state.get("sources") or {}
schema = sources_state.get(schema_name)
if not schema:
st.error(f"Schema with name: {schema_name} is not found")
return

resource = schema["resources"].get(resource_name)
with st.expander("Resource state", expanded=(resource is None)):
if not resource:
st.info(f"{resource_name} is missing resource state")
else:
spec = yaml.safe_dump(resource)
st.code(spec, language="yaml")
21 changes: 21 additions & 0 deletions dlt/helpers/streamlit_app/blocks/show_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import List

import dlt
import streamlit as st

from dlt.helpers.streamlit_app.utils import query_data


def show_data_button(pipeline: dlt.Pipeline, table_name: str) -> None:
if st.button("SHOW DATA", key=table_name):
df = query_data(pipeline, f"SELECT * FROM {table_name}", chunk_size=2048)
if df is None:
st.text("No rows returned")
else:
rows_count = df.shape[0]
if df.shape[0] < 2048:
st.text(f"All {rows_count} row(s)")
else:
st.text(f"Top {rows_count} row(s)")

st.dataframe(df)
Loading

0 comments on commit 3a815bc

Please sign in to comment.