Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code Cleanup - Github source #295

Merged
merged 5 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 1 addition & 154 deletions sources/github/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dlt.sources.helpers import requests

from .queries import ISSUES_QUERY, RATE_LIMIT, COMMENT_REACTIONS_QUERY
from .helpers import _get_rest_pages, _get_reactions_data


@dlt.source
Expand Down Expand Up @@ -111,157 +112,3 @@ def repo_events(
break

return repo_events


def _get_reactions_data(
node_type: str,
owner: str,
name: str,
access_token: str,
items_per_page: int,
max_items: int,
max_item_age_seconds: float = None,
) -> Iterator[Iterator[StrAny]]:
variables = {
"owner": owner,
"name": name,
"issues_per_page": items_per_page,
"first_reactions": 100,
"first_comments": 100,
"node_type": node_type,
}
for page_items in _get_graphql_pages(
access_token, ISSUES_QUERY % node_type, variables, node_type, max_items
):
# use reactionGroups to query for reactions to comments that have any reactions. reduces cost by 10-50x
reacted_comment_ids = {}
for item in page_items:
for comment in item["comments"]["nodes"]:
if any(group["createdAt"] for group in comment["reactionGroups"]):
# print(f"for comment {comment['id']}: has reaction")
reacted_comment_ids[comment["id"]] = comment
# if "reactionGroups" in comment:
comment.pop("reactionGroups", None)

# get comment reactions by querying comment nodes separately
comment_reactions = _get_comment_reaction(
list(reacted_comment_ids.keys()), access_token
)
# attach the reaction nodes where they should be
for comment in comment_reactions.values():
comment_id = comment["id"]
reacted_comment_ids[comment_id]["reactions"] = comment["reactions"]
yield map(_extract_nested_nodes, page_items)


def _extract_top_connection(data: StrAny, node_type: str) -> StrAny:
assert (
isinstance(data, dict) and len(data) == 1
), f"The data with list of {node_type} must be a dictionary and contain only one element"
data = next(iter(data.values()))
return data[node_type] # type: ignore


def _extract_nested_nodes(item: DictStrAny) -> DictStrAny:
"""Recursively moves `nodes` and `totalCount` to reduce nesting"""

item["reactions_totalCount"] = item["reactions"].get("totalCount", 0)
item["reactions"] = item["reactions"]["nodes"]
comments = item["comments"]
item["comments_totalCount"] = item["comments"].get("totalCount", 0)
for comment in comments["nodes"]:
if "reactions" in comment:
comment["reactions_totalCount"] = comment["reactions"].get("totalCount", 0)
comment["reactions"] = comment["reactions"]["nodes"]
item["comments"] = comments["nodes"]
return item


def _get_auth_header(access_token: str) -> StrAny:
if access_token:
return {"Authorization": f"Bearer {access_token}"}
else:
# REST API works without access token (with high rate limits)
return {}


def _run_graphql_query(
access_token: str, query: str, variables: DictStrAny
) -> Tuple[StrAny, StrAny]:
def _request() -> requests.Response:
r = requests.post(
"https://api.github.com/graphql",
json={"query": query, "variables": variables},
headers=_get_auth_header(access_token),
)
return r

data = _request().json()
if "errors" in data:
raise ValueError(data)
data = data["data"]
# pop rate limits
rate_limit = data.pop("rateLimit", {"cost": 0, "remaining": 0})
return data, rate_limit


def _get_graphql_pages(
access_token: str, query: str, variables: DictStrAny, node_type: str, max_items: int
) -> Iterator[List[DictStrAny]]:
items_count = 0
while True:
data, rate_limit = _run_graphql_query(access_token, query, variables)
data_items = _extract_top_connection(data, node_type)["nodes"]
items_count += len(data_items)
print(
f'Got {len(data_items)}/{items_count} {node_type}s, query cost {rate_limit["cost"]}, remaining credits: {rate_limit["remaining"]}'
)
if data_items:
yield data_items
else:
return
# print(data["repository"][node_type]["pageInfo"]["endCursor"])
variables["page_after"] = _extract_top_connection(data, node_type)["pageInfo"][
"endCursor"
]
if max_items and items_count >= max_items:
print(f"Max items limit reached: {items_count} >= {max_items}")
return


def _get_comment_reaction(comment_ids: List[str], access_token: str) -> StrAny:
"""Builds a query from a list of comment nodes and returns associated reactions"""
idx = 0
data: DictStrAny = {}
for page_chunk in chunks(comment_ids, 50):
subs = []
for comment_id in page_chunk:
subs.append(COMMENT_REACTIONS_QUERY % (idx, comment_id))
idx += 1
subs.append(RATE_LIMIT)
query = "{" + ",\n".join(subs) + "}"
# print(query)
page, rate_limit = _run_graphql_query(access_token, query, {})
print(
f'Got {len(page)} comments, query cost {rate_limit["cost"]}, remaining credits: {rate_limit["remaining"]}'
)
data.update(page)
return data


def _get_rest_pages(access_token: str, query: str) -> Iterator[List[StrAny]]:
def _request(url: str) -> requests.Response:
r = requests.get(url, headers=_get_auth_header(access_token))
print(f"got page {url}, requests left: " + r.headers["x-ratelimit-remaining"])
return r

url = "https://api.github.com" + query
while True:
r: requests.Response = _request(url)
page_items = r.json()
if len(page_items) == 0:
break
yield page_items
if "next" not in r.links:
break
url = r.links["next"]["url"]
28 changes: 19 additions & 9 deletions sources/github/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@ def _get_auth_header(access_token: str) -> StrAny:
#
# Rest API helpers
#
def get_rest_pages(access_token: str, query: str) -> Iterator[List[StrAny]]:
url = REST_API_BASE_URL + query
while True:
def _get_rest_pages(access_token: str, query: str) -> Iterator[List[StrAny]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you import this function to another script, so imo prefix "_", which means "private function", doesn't make sense.
you can keep it like get_rest_pages.

same for get_reactions_data

Copy link
Collaborator Author

@dat-a-man dat-a-man Dec 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, okay

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

def _request(url: str) -> requests.Response:
r = requests.get(url, headers=_get_auth_header(access_token))
print(f"got page {url}, requests left: " + r.headers["x-ratelimit-remaining"])
return r

url = "https://api.github.com" + query
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use REST_API_BASE_URL

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

while True:
r: requests.Response = _request(url)
page_items = r.json()
if len(page_items) == 0:
break
Expand All @@ -38,13 +43,14 @@ def get_rest_pages(access_token: str, query: str) -> Iterator[List[StrAny]]:
#
# GraphQL API helpers
#
def get_reactions_data(
def _get_reactions_data(
node_type: str,
owner: str,
name: str,
access_token: str,
items_per_page: int,
max_items: int,
max_item_age_seconds: float = None,
) -> Iterator[Iterator[StrAny]]:
variables = {
"owner": owner,
Expand Down Expand Up @@ -104,11 +110,15 @@ def _extract_nested_nodes(item: DictStrAny) -> DictStrAny:
def _run_graphql_query(
access_token: str, query: str, variables: DictStrAny
) -> Tuple[StrAny, StrAny]:
data = requests.post(
GRAPHQL_API_BASE_URL,
json={"query": query, "variables": variables},
headers=_get_auth_header(access_token),
).json()
def _request() -> requests.Response:
r = requests.post(
"https://api.github.com/graphql",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use GRAPHQL_API_BASE_URL instead of "https://api.github.com/graphql"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

json={"query": query, "variables": variables},
headers=_get_auth_header(access_token),
)
return r

data = _request().json()
if "errors" in data:
raise ValueError(data)
data = data["data"]
Expand Down
Loading