forked from dbt-labs/jaffle-shop-classic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hackernews.py
83 lines (67 loc) · 3.3 KB
/
hackernews.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import base64
from io import BytesIO
from typing import List
import matplotlib.pyplot as plt
import pandas as pd
import requests
from dagster import AssetExecutionContext, MetadataValue, asset
from wordcloud import STOPWORDS, WordCloud
@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstory_ids() -> List[int]:
"""Get up to 500 top stories from the HackerNews topstories endpoint.
API Docs: https://github.com/HackerNews/API#new-top-and-best-stories
"""
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
top_500_newstories = requests.get(newstories_url).json()
return top_500_newstories
@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstories(
context: AssetExecutionContext, hackernews_topstory_ids: List[int]
) -> pd.DataFrame:
"""Get items based on story ids from the HackerNews items endpoint. It may take 1-2 minutes to fetch all 500 items.
API Docs: https://github.com/HackerNews/API#items
"""
results = []
for item_id in hackernews_topstory_ids:
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)
if len(results) % 20 == 0:
context.log.info(f"Got {len(results)} items so far.")
df = pd.DataFrame(results)
# Dagster supports attaching arbitrary metadata to asset materializations. This metadata will be
# shown in the run logs and also be displayed on the "Activity" tab of the "Asset Details" page in the UI.
# This metadata would be useful for monitoring and maintaining the asset as you iterate.
# Read more about in asset metadata in https://docs.dagster.io/concepts/assets/software-defined-assets#recording-materialization-metadata
context.add_output_metadata(
{
"num_records": len(df),
"preview": MetadataValue.md(df.head().to_markdown()),
}
)
return df
@asset(group_name="hackernews", compute_kind="Plot")
def hackernews_topstories_word_cloud(
context: AssetExecutionContext, hackernews_topstories: pd.DataFrame
) -> bytes:
"""Exploratory analysis: Generate a word cloud from the current top 500 HackerNews top stories.
Embed the plot into a Markdown metadata for quick view.
Read more about how to create word clouds in http://amueller.github.io/word_cloud/.
"""
stopwords = set(STOPWORDS)
stopwords.update(["Ask", "Show", "HN"])
titles_text = " ".join([str(item) for item in hackernews_topstories["title"]])
titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(titles_text)
# Generate the word cloud image
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(titles_cloud, interpolation="bilinear")
plt.axis("off")
plt.tight_layout(pad=0)
# Save the image to a buffer and embed the image into Markdown content for quick view
buffer = BytesIO()
plt.savefig(buffer, format="png")
image_data = base64.b64encode(buffer.getvalue())
md_content = f"![img](data:image/png;base64,{image_data.decode()})"
# Attach the Markdown content as metadata to the asset
# Read about more metadata types in https://docs.dagster.io/_apidocs/ops#metadata-types
context.add_output_metadata({"plot": MetadataValue.md(md_content)})
return image_data