Skip to content

Commit

Permalink
add snippets and parsing and running script
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Mar 12, 2024
1 parent 5275449 commit f531fca
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 0 deletions.
Empty file added docs/snippets/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions docs/snippets/performance_chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
description: Chunking example
tags: performance
"""

import dlt

if __name__ == "__main__":

def get_rows(limit):
yield from map(lambda n: {"row": n}, range(limit))

@dlt.resource
def database_cursor():
# here we yield each row returned from database separately
yield from get_rows(10000)

assert len(list(database_cursor())) == 10000
23 changes: 23 additions & 0 deletions docs/snippets/performance_chunking_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
description: Chunking example
tags: performance
"""

import dlt

if __name__ == "__main__":

from itertools import islice

def get_rows(limit):
yield from map(lambda n: {"row": n}, range(limit))

@dlt.resource
def database_cursor_chunked():
# here we yield chunks of size 1000
rows = get_rows(10000)
while item_slice := list(islice(rows, 1000)):
print(f"got chunk of length {len(item_slice)}")
yield item_slice

assert len(list(database_cursor_chunked())) == 10000
33 changes: 33 additions & 0 deletions docs/snippets/performance_parallel_awaitables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
description: Extracting awaitables in parallel
tags: performance, extract, parallelization
"""

import dlt

if __name__ == "__main__":

import asyncio
from threading import current_thread

@dlt.resource
async def a_list_items(start, limit):
# simulate a slow REST API where you wait 0.3 sec for each item
index = start
while index < start + limit:
await asyncio.sleep(0.3)
yield index
index += 1

@dlt.transformer
async def a_get_details(item_id):
# simulate a slow REST API where you wait 0.3 sec for each item
await asyncio.sleep(0.3)
print(f"item_id {item_id} in thread {current_thread().name}")
# just return the results, if you yield, generator will be evaluated in main thread
return {"row": item_id}

result = list(a_list_items(0, 10) | a_get_details)
print(result)

assert len(result) == 10
52 changes: 52 additions & 0 deletions docs/snippets/performance_parallel_extract_callables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
description: Extracting callables in parallel
tags: performance, extract, parallelization
"""

import dlt

if __name__ == "__main__":

import time
from threading import current_thread

@dlt.resource(parallelized=True)
def list_users(n_users):
for i in range(1, 1 + n_users):
# Simulate network delay of a rest API call fetching a page of items
if i % 10 == 0:
time.sleep(0.1)
yield i

@dlt.transformer(parallelized=True)
def get_user_details(user_id):
# Transformer that fetches details for users in a page
time.sleep(0.1) # Simulate latency of a rest API call
print(f"user_id {user_id} in thread {current_thread().name}")
return {"entity": "user", "id": user_id}

@dlt.resource(parallelized=True)
def list_products(n_products):
for i in range(1, 1 + n_products):
if i % 10 == 0:
time.sleep(0.1)
yield i

@dlt.transformer(parallelized=True)
def get_product_details(product_id):
time.sleep(0.1)
print(f"product_id {product_id} in thread {current_thread().name}")
return {"entity": "product", "id": product_id}

@dlt.source
def api_data():
return [
list_users(24) | get_user_details,
list_products(32) | get_product_details,
]

# evaluate the pipeline and print all the items
# sources are iterators and they are evaluated in the same way in the pipeline.run
result = list(api_data())

assert len(result) == 56
75 changes: 75 additions & 0 deletions docs/website/check_snippets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
Load all snippets in snippet folder, check wether they parse with ast and run them
"""
import typing as t
import os
import ast
import sys

SNIPPET_DIR = "../snippets"

def get_snippet_list() -> t.List[str]:
"""Get list of available snippets in the snippet folder."""
return [s.replace(".py", "") for s in os.listdir(SNIPPET_DIR) if s.endswith(".py") and s != "__init__.py"]

def get_snippet(snippet_name: str) -> str:
"""Get the content of a snippet."""
with open(os.path.join(SNIPPET_DIR, snippet_name + ".py"), "r") as f:
return f.read()

def parse_snippet(snippet: str) -> bool:
"""Parse a snippet with ast."""
try:
ast.parse(snippet)
print("\033[92m -> Parse ok \033[0m")
return True
except:
print("\033[91m -> Failed to parse snippet, skipping run \033[0m")
return False

def run_snippet(snippet: str) -> bool:
"""Run a snippet."""
try:
with open(os.devnull, "w") as devnull:
old_stdout = sys.stdout
sys.stdout = devnull
exec(snippet, {"__name__": "__main__"})
sys.stdout = old_stdout
print("\033[92m -> Run ok \033[0m")
return True
except:
sys.stdout = old_stdout
print("\033[91m -> Failed to run snippet\033[0m")
return False


if __name__ == "__main__":

print("Checking snippets")
snippet_list = get_snippet_list()
failed_parsing = []
failed_running = []
print(f"Found {len(snippet_list)} snippets")

# parse and run all snippets
for s in snippet_list:
print(f"Checking snippet {s}")

snippet = get_snippet(s)
if parse_snippet(snippet) is False:
failed_parsing.append(snippet)
continue

# snippet needs to be run in main function for some reason
if run_snippet(snippet) is False:
failed_running.append(snippet)

print()
if failed_parsing or failed_running:
print(f"\033[91m{len(failed_parsing)} snippets failed to parse, {len(failed_running)} snippets failed to run")
exit(1)
else:
print("\033[92mAll snippets ok")
exit(0)


3 changes: 3 additions & 0 deletions docs/website/process_docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Parse all markdown files, insert snippets, add tuba links and export to final directory
"""

0 comments on commit f531fca

Please sign in to comment.