Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batching leaderboard generator. #984

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import requests # type: ignore
from bugout.data import BugoutSearchResult

from .utils import get_results_for_moonstream_query
from .utils import get_results_for_moonstream_query, leaderboard_push_batch
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID,
MOONSTREAM_API_URL,
MOONSTREAM_ENGINE_URL,
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS,
)

from ..settings import bugout_client as bc
Expand All @@ -35,10 +37,15 @@ def handle_leaderboards(args: argparse.Namespace) -> None:

### get leaderboard journal

leaderboard_push_batch_size = args.leaderboard_push_batch_size

leaderboard_push_timeout_seconds = args.leaderboard_push_timeout_seconds

query = "#leaderboard #status:active"

if args.leaderboard_id: # way to run only one leaderboard
query += f" #leaderboard_id:{args.leaderboard_id}"
if args.leaderboard_id: # way to run only one leaderboard without status:active
query = f"#leaderboard #leaderboard_id:{args.leaderboard_id}"

try:
leaderboards = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
Expand Down Expand Up @@ -116,26 +123,33 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
"Content-Type": "application/json",
}

try:
leaderboard_api_response = requests.put(
leaderboard_push_api_url,
json=query_results["data"],
headers=leaderboard_api_headers,
timeout=10,
if len(query_results["data"]) > leaderboard_push_batch_size:
logger.info(
f"Pushing {len(query_results['data'])} scores to leaderboard {leaderboard_id} in batches of {leaderboard_push_batch_size}"
)
except Exception as e:
logger.error(
f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}"
leaderboard_push_batch(
leaderboard_id,
leaderboard_data,
query_results["data"],
leaderboard_api_headers,
leaderboard_push_batch_size,
timeout=leaderboard_push_timeout_seconds,
)
continue

try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
continue
else:
try:
leaderboard_api_response = requests.put(
leaderboard_push_api_url,
json=query_results["data"],
headers=leaderboard_api_headers,
timeout=leaderboard_push_timeout_seconds,
)
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
continue

### get leaderboard from leaderboard API

Expand Down Expand Up @@ -213,6 +227,18 @@ def main():
required=True,
help="Moonstream Access Token to use for Moonstream Query API requests",
)
leaderboard_generator_parser.add_argument(
"--leaderboard-push-batch-size",
type=int,
default=MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
help="Number of scores to push to leaderboard API at once",
)
leaderboard_generator_parser.add_argument(
"--leaderboard-push-timeout-seconds",
type=int,
default=MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS,
help="Timeout for leaderboard API requests",
)

leaderboard_generator_parser.set_defaults(func=handle_leaderboards)

Expand Down
126 changes: 124 additions & 2 deletions crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
import logging
import os
import time
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List


import requests # type: ignore

from ..settings import MOONSTREAM_API_URL
from ..settings import (
MOONSTREAM_API_URL,
MOONSTREAM_ENGINE_URL,
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS,
)


logging.basicConfig()
Expand Down Expand Up @@ -101,3 +106,120 @@ def get_results_for_moonstream_query(
keep_going = num_retries <= max_retries

return result


def get_data_from_url(url):
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get data: HTTP {response.status_code}")


def chunk_data(data, chunk_size=100000):
for i in range(0, len(data), chunk_size):
yield data[i : i + chunk_size]


def send_data_to_endpoint(chunks, endpoint_url, headers, timeout=10):
for index, chunk in enumerate(chunks):
try:
logger.info(f"Pushing chunk {index} to leaderboard API")
response = requests.put(
endpoint_url, headers=headers, json=chunk, timeout=timeout
)

response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
continue


def leaderboard_push_batch(
leaderboard_id: str,
leaderboard_config: Dict[str, Any],
data: List[Dict[str, Any]],
headers: Dict[str, str],
batch_size: int = MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE,
timeout: int = 10,
) -> None:
"""
Push leaderboard data to the leaderboard API in batches.
"""

## first step create leaderboard version

leaderboard_version_api_url = (
f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions"
)

json_data = {
"publish": False,
}

leaderboard_api_response = requests.post(
leaderboard_version_api_url, json=json_data, headers=headers, timeout=5
)

try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not create leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}"
)
return

leaderboard_version_id = leaderboard_api_response.json()["version"]

## second step push data to leaderboard version

leaderboard_version_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id}/scores?normalize_addresses={leaderboard_config['normalize_addresses']}&overwrite=false"

chunks = chunk_data(data, chunk_size=batch_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also use a generator comprehension here instead of defining def chunk_data... function:

chunks = (data[i:i+batch_size] for i in range(0, len(data), batch_size))

See this for more details on generator comprehensions: https://python-reference.readthedocs.io/en/latest/docs/comprehensions/gen_expression.html


send_data_to_endpoint(
chunks, leaderboard_version_push_api_url, headers, timeout=timeout
)

## third step publish leaderboard version

leaderboard_version_publish_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id}"

json_data = {
"publish": True,
}

try:
leaderboard_api_response = requests.put(
leaderboard_version_publish_api_url,
json=json_data,
headers=headers,
timeout=5,
)

leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not publish leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}"
)
return

## delete leaderboard version -1

try:
leaderboard_version_delete_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id - 1}"

leaderboard_api_response = requests.delete(
leaderboard_version_delete_api_url,
headers=headers,
timeout=5,
)

leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not delete leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}"
)
return
4 changes: 4 additions & 0 deletions crawlers/mooncrawl/mooncrawl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,7 @@
raise ValueError(
"MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID environment variable must be set"
)


MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 20000
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60
Loading