Skip to content

Commit

Permalink
Move pipeline_state_info into blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 18, 2024
1 parent 20802a0 commit 03c22b7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 28 deletions.
31 changes: 31 additions & 0 deletions dlt/helpers/streamlit_app/blocks/pipeline_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import dlt
import streamlit as st

from dlt.common.destination.reference import WithStateSync
from dlt.helpers.streamlit_app.widgets import tag
from dlt.pipeline.state_sync import load_pipeline_state_from_destination


def pipeline_state_info(pipeline: dlt.Pipeline) -> None:
st.divider()
tag(pipeline.pipeline_name, label="Pipeline")
tag(pipeline.destination.destination_name, label="Destination")

remote_state = None
with pipeline.destination_client() as client:
if isinstance(client, WithStateSync):
remote_state = load_pipeline_state_from_destination(pipeline.pipeline_name, client)

local_state = pipeline.state

if remote_state:
remote_state_version = remote_state["_state_version"]
else:
remote_state_version = "---" # type: ignore

if remote_state_version != local_state["_state_version"]:
st.text("")
st.warning(
"Looks like that local state is not yet synchronized or synchronization is disabled",
icon="⚠️",
)
30 changes: 2 additions & 28 deletions dlt/helpers/streamlit_app/menu.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import dlt
import streamlit as st

from dlt.common.destination.reference import WithStateSync
from dlt.helpers.streamlit_app.blocks.pipeline_state import pipeline_state_info
from dlt.helpers.streamlit_app.utils import HERE
from dlt.helpers.streamlit_app.widgets import logo, stat, tag, mode_selector
from dlt.helpers.streamlit_app.widgets import logo, mode_selector
from dlt.helpers.streamlit_app.blocks.load_info import last_load_info
from dlt.pipeline.state_sync import load_pipeline_state_from_destination


def menu(pipeline: dlt.Pipeline) -> None:
Expand All @@ -15,28 +14,3 @@ def menu(pipeline: dlt.Pipeline) -> None:
st.page_link(f"{HERE}/pages/load_info.py", label="Load info", icon="💾")
pipeline_state_info(pipeline)
last_load_info(pipeline)


def pipeline_state_info(pipeline: dlt.Pipeline) -> None:
st.divider()
tag(pipeline.pipeline_name, label="Pipeline")
tag(pipeline.destination.destination_name, label="Destination")

remote_state = None
with pipeline.destination_client() as client:
if isinstance(client, WithStateSync):
remote_state = load_pipeline_state_from_destination(pipeline.pipeline_name, client)

local_state = pipeline.state

if remote_state:
remote_state_version = remote_state["_state_version"]
else:
remote_state_version = "---" # type: ignore

if remote_state_version != local_state["_state_version"]:
st.text("")
st.warning(
"Looks like that local state is not yet synchronized or synchronization is disabled",
icon="⚠️",
)

0 comments on commit 03c22b7

Please sign in to comment.