Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code Cleanup - Github source #295

Merged
merged 5 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 7 additions & 164 deletions sources/github/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
"""Source that load github issues, pull requests and reactions for a specific repository via customizable graphql query. Loads events incrementally."""
import urllib.parse
from typing import Any, Iterator, List, Sequence, Tuple
from typing import Iterator, Sequence

import dlt
from dlt.common.typing import StrAny, DictStrAny, TDataItems
from dlt.common.utils import chunks
from dlt.common.typing import TDataItems
from dlt.sources import DltResource
from dlt.sources.helpers import requests

from .queries import ISSUES_QUERY, RATE_LIMIT, COMMENT_REACTIONS_QUERY
from .helpers import get_reactions_data, get_rest_pages


@dlt.source
Expand Down Expand Up @@ -41,7 +38,7 @@ def github_reactions(
"""
return (
dlt.resource(
_get_reactions_data(
get_reactions_data(
"issues",
owner,
name,
Expand All @@ -54,7 +51,7 @@ def github_reactions(
write_disposition="replace",
),
dlt.resource(
_get_reactions_data(
get_reactions_data(
"pullRequests",
owner,
name,
Expand Down Expand Up @@ -94,12 +91,12 @@ def repo_events(
"created_at", initial_value="1970-01-01T00:00:00Z", last_value_func=max
)
) -> Iterator[TDataItems]:
repos_path = "/repos/%s/%s/events" % (
repos_path = "/repos/{}/{}/events".format(
urllib.parse.quote(owner),
urllib.parse.quote(name),
)

for page in _get_rest_pages(access_token, repos_path + "?per_page=100"):
for page in get_rest_pages(access_token, repos_path + "?per_page=100"):
yield page

# stop requesting pages if the last element was already older than initial value
Expand All @@ -111,157 +108,3 @@ def repo_events(
break

return repo_events


def _get_reactions_data(
node_type: str,
owner: str,
name: str,
access_token: str,
items_per_page: int,
max_items: int,
max_item_age_seconds: float = None,
) -> Iterator[Iterator[StrAny]]:
variables = {
"owner": owner,
"name": name,
"issues_per_page": items_per_page,
"first_reactions": 100,
"first_comments": 100,
"node_type": node_type,
}
for page_items in _get_graphql_pages(
access_token, ISSUES_QUERY % node_type, variables, node_type, max_items
):
# use reactionGroups to query for reactions to comments that have any reactions. reduces cost by 10-50x
reacted_comment_ids = {}
for item in page_items:
for comment in item["comments"]["nodes"]:
if any(group["createdAt"] for group in comment["reactionGroups"]):
# print(f"for comment {comment['id']}: has reaction")
reacted_comment_ids[comment["id"]] = comment
# if "reactionGroups" in comment:
comment.pop("reactionGroups", None)

# get comment reactions by querying comment nodes separately
comment_reactions = _get_comment_reaction(
list(reacted_comment_ids.keys()), access_token
)
# attach the reaction nodes where they should be
for comment in comment_reactions.values():
comment_id = comment["id"]
reacted_comment_ids[comment_id]["reactions"] = comment["reactions"]
yield map(_extract_nested_nodes, page_items)


def _extract_top_connection(data: StrAny, node_type: str) -> StrAny:
assert (
isinstance(data, dict) and len(data) == 1
), f"The data with list of {node_type} must be a dictionary and contain only one element"
data = next(iter(data.values()))
return data[node_type] # type: ignore


def _extract_nested_nodes(item: DictStrAny) -> DictStrAny:
"""Recursively moves `nodes` and `totalCount` to reduce nesting"""

item["reactions_totalCount"] = item["reactions"].get("totalCount", 0)
item["reactions"] = item["reactions"]["nodes"]
comments = item["comments"]
item["comments_totalCount"] = item["comments"].get("totalCount", 0)
for comment in comments["nodes"]:
if "reactions" in comment:
comment["reactions_totalCount"] = comment["reactions"].get("totalCount", 0)
comment["reactions"] = comment["reactions"]["nodes"]
item["comments"] = comments["nodes"]
return item


def _get_auth_header(access_token: str) -> StrAny:
if access_token:
return {"Authorization": f"Bearer {access_token}"}
else:
# REST API works without access token (with high rate limits)
return {}


def _run_graphql_query(
access_token: str, query: str, variables: DictStrAny
) -> Tuple[StrAny, StrAny]:
def _request() -> requests.Response:
r = requests.post(
"https://api.github.com/graphql",
json={"query": query, "variables": variables},
headers=_get_auth_header(access_token),
)
return r

data = _request().json()
if "errors" in data:
raise ValueError(data)
data = data["data"]
# pop rate limits
rate_limit = data.pop("rateLimit", {"cost": 0, "remaining": 0})
return data, rate_limit


def _get_graphql_pages(
access_token: str, query: str, variables: DictStrAny, node_type: str, max_items: int
) -> Iterator[List[DictStrAny]]:
items_count = 0
while True:
data, rate_limit = _run_graphql_query(access_token, query, variables)
data_items = _extract_top_connection(data, node_type)["nodes"]
items_count += len(data_items)
print(
f'Got {len(data_items)}/{items_count} {node_type}s, query cost {rate_limit["cost"]}, remaining credits: {rate_limit["remaining"]}'
)
if data_items:
yield data_items
else:
return
# print(data["repository"][node_type]["pageInfo"]["endCursor"])
variables["page_after"] = _extract_top_connection(data, node_type)["pageInfo"][
"endCursor"
]
if max_items and items_count >= max_items:
print(f"Max items limit reached: {items_count} >= {max_items}")
return


def _get_comment_reaction(comment_ids: List[str], access_token: str) -> StrAny:
"""Builds a query from a list of comment nodes and returns associated reactions"""
idx = 0
data: DictStrAny = {}
for page_chunk in chunks(comment_ids, 50):
subs = []
for comment_id in page_chunk:
subs.append(COMMENT_REACTIONS_QUERY % (idx, comment_id))
idx += 1
subs.append(RATE_LIMIT)
query = "{" + ",\n".join(subs) + "}"
# print(query)
page, rate_limit = _run_graphql_query(access_token, query, {})
print(
f'Got {len(page)} comments, query cost {rate_limit["cost"]}, remaining credits: {rate_limit["remaining"]}'
)
data.update(page)
return data


def _get_rest_pages(access_token: str, query: str) -> Iterator[List[StrAny]]:
def _request(url: str) -> requests.Response:
r = requests.get(url, headers=_get_auth_header(access_token))
print(f"got page {url}, requests left: " + r.headers["x-ratelimit-remaining"])
return r

url = "https://api.github.com" + query
while True:
r: requests.Response = _request(url)
page_items = r.json()
if len(page_items) == 0:
break
yield page_items
if "next" not in r.links:
break
url = r.links["next"]["url"]
38 changes: 25 additions & 13 deletions sources/github/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Github source helpers"""

from .queries import ISSUES_QUERY, RATE_LIMIT, COMMENT_REACTIONS_QUERY
from typing import Any, Iterator, List, Sequence, Tuple
from dlt.common.typing import StrAny, DictStrAny, TDataItems
from dlt.sources.helpers import requests
from typing import Iterator, List, Tuple
from dlt.common.typing import DictStrAny, StrAny
from dlt.common.utils import chunks
from .settings import REST_API_BASE_URL, GRAPHQL_API_BASE_URL
from dlt.sources.helpers import requests
from .queries import COMMENT_REACTIONS_QUERY, ISSUES_QUERY, RATE_LIMIT
from .settings import GRAPHQL_API_BASE_URL, REST_API_BASE_URL


#
Expand All @@ -23,16 +23,23 @@ def _get_auth_header(access_token: str) -> StrAny:
# Rest API helpers
#
def get_rest_pages(access_token: str, query: str) -> Iterator[List[StrAny]]:
url = REST_API_BASE_URL + query
def _request(page_url: str) -> requests.Response:
r = requests.get(page_url, headers=_get_auth_header(access_token))
print(
f"got page {page_url}, requests left: " + r.headers["x-ratelimit-remaining"]
)
return r

next_page_url = REST_API_BASE_URL + query
while True:
r = requests.get(url, headers=_get_auth_header(access_token))
r: requests.Response = _request(next_page_url)
page_items = r.json()
if len(page_items) == 0:
break
yield page_items
if "next" not in r.links:
break
url = r.links["next"]["url"]
next_page_url = r.links["next"]["url"]


#
Expand All @@ -45,6 +52,7 @@ def get_reactions_data(
access_token: str,
items_per_page: int,
max_items: int,
max_item_age_seconds: float = None,
) -> Iterator[Iterator[StrAny]]:
variables = {
"owner": owner,
Expand Down Expand Up @@ -104,11 +112,15 @@ def _extract_nested_nodes(item: DictStrAny) -> DictStrAny:
def _run_graphql_query(
access_token: str, query: str, variables: DictStrAny
) -> Tuple[StrAny, StrAny]:
data = requests.post(
GRAPHQL_API_BASE_URL,
json={"query": query, "variables": variables},
headers=_get_auth_header(access_token),
).json()
def _request() -> requests.Response:
r = requests.post(
GRAPHQL_API_BASE_URL,
json={"query": query, "variables": variables},
headers=_get_auth_header(access_token),
)
return r

data = _request().json()
if "errors" in data:
raise ValueError(data)
data = data["data"]
Expand Down
Loading