Skip to content

Commit

Permalink
remove unneeded function (and unrelated formatting :) )
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jun 10, 2024
1 parent a00cb0a commit 952cd14
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 24 deletions.
9 changes: 0 additions & 9 deletions dlt/helpers/streamlit_app/blocks/table_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,6 @@ def list_table_hints(pipeline: dlt.Pipeline, tables: List[TTableSchema]) -> None
table["resource"],
)

# table schema contains various hints (like clustering or partition options)
# that we do not want to show in basic view
def essentials_f(c: TColumnSchema) -> Dict[str, Any]:
return {
"name": c["name"],
"data_type": c.get("data_type"),
"nullable": c.get("nullable"),
}

st.table(
map(
lambda c: {
Expand Down
33 changes: 18 additions & 15 deletions docs/examples/postgres_to_postgres/postgres_to_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,17 @@ def pg_resource_chunked(
order_date: str,
load_type: str = "merge",
columns: str = "*",
credentials: ConnectionStringCredentials = dlt.secrets[
"sources.postgres.credentials"
],
credentials: ConnectionStringCredentials = dlt.secrets["sources.postgres.credentials"],
):
print(
f"dlt.resource write_disposition: `{load_type}` -- ",
f"connection string: postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}",
"connection string:"
f" postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}",
)

query = f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}" # Needed to have an idempotent query
query = ( # Needed to have an idempotent query
f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}"
)

source = dlt.resource( # type: ignore
name=table_name,
Expand Down Expand Up @@ -133,9 +134,7 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):

if __name__ == "__main__":
# Input Handling
parser = argparse.ArgumentParser(
description="Run specific functions in the script."
)
parser = argparse.ArgumentParser(description="Run specific functions in the script.")
parser.add_argument("--replace", action="store_true", help="Run initial load")
parser.add_argument("--merge", action="store_true", help="Run delta load")
args = parser.parse_args()
Expand Down Expand Up @@ -233,20 +232,26 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):
).fetchone()[0]
print(f"timestamped_schema: {timestamped_schema}")

target_credentials = ConnectionStringCredentials(dlt.secrets["destination.postgres.credentials"])
target_credentials = ConnectionStringCredentials(
dlt.secrets["destination.postgres.credentials"]
)
# connect to destination (timestamped schema)
conn.sql(
f"ATTACH 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}' AS pg_db (TYPE postgres);"
"ATTACH"
f" 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}'"
" AS pg_db (TYPE postgres);"
)
conn.sql(f"CREATE SCHEMA IF NOT EXISTS pg_db.{timestamped_schema};")

for table in tables:
print(
f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO Postgres {timestamped_schema}.{table['table_name']}"
f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO"
f" Postgres {timestamped_schema}.{table['table_name']}"
)

conn.sql(
f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS SELECT * FROM {timestamped_schema}.{table['table_name']};"
f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS"
f" SELECT * FROM {timestamped_schema}.{table['table_name']};"
)
conn.sql(
f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};"
Expand All @@ -262,9 +267,7 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):
assert int(rows) == 9

# 5. Cleanup and rename Schema
print(
"##################################### RENAME Schema and CLEANUP ########"
)
print("##################################### RENAME Schema and CLEANUP ########")
try:
con_hd = psycopg2.connect(
dbname=target_credentials.database,
Expand Down

0 comments on commit 952cd14

Please sign in to comment.