From 21fb0ca0d7c94a29eeef335967739d7ab2424aed Mon Sep 17 00:00:00 2001 From: Andrey Date: Sat, 9 Dec 2023 06:42:36 +0200 Subject: [PATCH 1/2] Add batching leaderboard generator. --- .../mooncrawl/leaderboards_generator/cli.py | 66 ++++++--- .../mooncrawl/leaderboards_generator/utils.py | 126 +++++++++++++++++- crawlers/mooncrawl/mooncrawl/settings.py | 4 + 3 files changed, 174 insertions(+), 22 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py index 39b8f0f7..7e382109 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py @@ -7,12 +7,14 @@ import requests # type: ignore from bugout.data import BugoutSearchResult -from .utils import get_results_for_moonstream_query +from .utils import get_results_for_moonstream_query, leaderboard_push_batch from ..settings import ( MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID, MOONSTREAM_API_URL, MOONSTREAM_ENGINE_URL, + MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE, + MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS, ) from ..settings import bugout_client as bc @@ -35,10 +37,15 @@ def handle_leaderboards(args: argparse.Namespace) -> None: ### get leaderboard journal + leaderboard_push_batch_size = args.leaderboard_push_batch_size + + leaderboard_push_timeout_seconds = args.leaderboard_push_timeout_seconds + query = "#leaderboard #status:active" - if args.leaderboard_id: # way to run only one leaderboard - query += f" #leaderboard_id:{args.leaderboard_id}" + if args.leaderboard_id: # way to run only one leaderboard without status:active + query = f"#leaderboard #leaderboard_id:{args.leaderboard_id}" + try: leaderboards = bc.search( token=MOONSTREAM_ADMIN_ACCESS_TOKEN, @@ -116,26 +123,33 @@ def handle_leaderboards(args: argparse.Namespace) -> None: "Content-Type": "application/json", } - try: - leaderboard_api_response = requests.put( - leaderboard_push_api_url, - json=query_results["data"], - headers=leaderboard_api_headers, - timeout=10, + if len(query_results["data"]) > leaderboard_push_batch_size: + logger.info( + f"Pushing {len(query_results['data'])} scores to leaderboard {leaderboard_id} in batches of {leaderboard_push_batch_size}" ) - except Exception as e: - logger.error( - f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}" + leaderboard_push_batch( + leaderboard_id, + leaderboard_data, + query_results["data"], + leaderboard_api_headers, + leaderboard_push_batch_size, + timeout=leaderboard_push_timeout_seconds, ) - continue - try: - leaderboard_api_response.raise_for_status() - except requests.exceptions.HTTPError as http_error: - logger.error( - f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}" - ) - continue + else: + try: + leaderboard_api_response = requests.put( + leaderboard_push_api_url, + json=query_results["data"], + headers=leaderboard_api_headers, + timeout=leaderboard_push_timeout_seconds, + ) + leaderboard_api_response.raise_for_status() + except requests.exceptions.HTTPError as http_error: + logger.error( + f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}" + ) + continue ### get leaderboard from leaderboard API @@ -213,6 +227,18 @@ def main(): required=True, help="Moonstream Access Token to use for Moonstream Query API requests", ) + leaderboard_generator_parser.add_argument( + "--leaderboard-push-batch-size", + type=int, + default=MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE, + help="Number of scores to push to leaderboard API at once", + ) + leaderboard_generator_parser.add_argument( + "--leaderboard-push-timeout-seconds", + type=int, + default=MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS, + help="Timeout for leaderboard API requests", + ) leaderboard_generator_parser.set_defaults(func=handle_leaderboards) diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py index fc4d1e01..1e08f668 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py @@ -3,12 +3,17 @@ import logging import os import time -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, List import requests # type: ignore -from ..settings import MOONSTREAM_API_URL +from ..settings import ( + MOONSTREAM_API_URL, + MOONSTREAM_ENGINE_URL, + MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE, + MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS, +) logging.basicConfig() @@ -101,3 +106,120 @@ def get_results_for_moonstream_query( keep_going = num_retries <= max_retries return result + + +def get_data_from_url(url): + response = requests.get(url) + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get data: HTTP {response.status_code}") + + +def chunk_data(data, chunk_size=100000): + for i in range(0, len(data), chunk_size): + yield data[i : i + chunk_size] + + +def send_data_to_endpoint(chunks, endpoint_url, headers, timeout=10): + for index, chunk in enumerate(chunks): + try: + logger.info(f"Pushing chunk {index} to leaderboard API") + response = requests.put( + endpoint_url, headers=headers, json=chunk, timeout=timeout + ) + + response.raise_for_status() + except requests.exceptions.HTTPError as http_error: + logger.error( + f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}" + ) + continue + + +def leaderboard_push_batch( + leaderboard_id: str, + leaderboard_config: Dict[str, Any], + data: List[Dict[str, Any]], + headers: Dict[str, str], + batch_size: int = MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE, + timeout: int = 10, +) -> None: + """ + Push leaderboard data to the leaderboard API in batches. + """ + + ## first step create leaderboard version + + leaderboard_version_api_url = ( + f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions" + ) + + json_data = { + "publish": False, + } + + leaderboard_api_response = requests.post( + leaderboard_version_api_url, json=json_data, headers=headers, timeout=5 + ) + + try: + leaderboard_api_response.raise_for_status() + except requests.exceptions.HTTPError as http_error: + logger.error( + f"Could not create leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}" + ) + return + + leaderboard_version_id = leaderboard_api_response.json()["version"] + + ## second step push data to leaderboard version + + leaderboard_version_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id}/scores?normalize_addresses={leaderboard_config['normalize_addresses']}&overwrite=false" + + chunks = chunk_data(data, chunk_size=batch_size) + + send_data_to_endpoint( + chunks, leaderboard_version_push_api_url, headers, timeout=timeout + ) + + ## third step publish leaderboard version + + leaderboard_version_publish_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id}" + + json_data = { + "publish": True, + } + + try: + leaderboard_api_response = requests.put( + leaderboard_version_publish_api_url, + json=json_data, + headers=headers, + timeout=5, + ) + + leaderboard_api_response.raise_for_status() + except requests.exceptions.HTTPError as http_error: + logger.error( + f"Could not publish leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}" + ) + return + + ## delete leaderboard version -1 + + try: + leaderboard_version_delete_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id - 1}" + + leaderboard_api_response = requests.delete( + leaderboard_version_delete_api_url, + headers=headers, + timeout=5, + ) + + leaderboard_api_response.raise_for_status() + except requests.exceptions.HTTPError as http_error: + logger.error( + f"Could not delete leaderboard version: {http_error.response.text} with status code {http_error.response.status_code}" + ) + return diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 79a3a8a5..2faafec2 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -321,3 +321,7 @@ raise ValueError( "MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID environment variable must be set" ) + + +MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 20000 +MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60 From 8a2e624e097a002a9b0b1b92de50a9032ddcb546 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 13 Dec 2023 21:39:52 +0200 Subject: [PATCH 2/2] Add changes. --- .../mooncrawl/leaderboards_generator/utils.py | 15 +++++---------- crawlers/mooncrawl/mooncrawl/settings.py | 2 +- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py index 1e08f668..98135370 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py @@ -116,11 +116,6 @@ def get_data_from_url(url): raise Exception(f"Failed to get data: HTTP {response.status_code}") -def chunk_data(data, chunk_size=100000): - for i in range(0, len(data), chunk_size): - yield data[i : i + chunk_size] - - def send_data_to_endpoint(chunks, endpoint_url, headers, timeout=10): for index, chunk in enumerate(chunks): try: @@ -134,7 +129,7 @@ def send_data_to_endpoint(chunks, endpoint_url, headers, timeout=10): logger.error( f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}" ) - continue + raise http_error def leaderboard_push_batch( @@ -160,7 +155,7 @@ def leaderboard_push_batch( } leaderboard_api_response = requests.post( - leaderboard_version_api_url, json=json_data, headers=headers, timeout=5 + leaderboard_version_api_url, json=json_data, headers=headers, timeout=10 ) try: @@ -177,7 +172,7 @@ def leaderboard_push_batch( leaderboard_version_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/versions/{leaderboard_version_id}/scores?normalize_addresses={leaderboard_config['normalize_addresses']}&overwrite=false" - chunks = chunk_data(data, chunk_size=batch_size) + chunks = [data[x : x + batch_size] for x in range(0, len(data), batch_size)] send_data_to_endpoint( chunks, leaderboard_version_push_api_url, headers, timeout=timeout @@ -196,7 +191,7 @@ def leaderboard_push_batch( leaderboard_version_publish_api_url, json=json_data, headers=headers, - timeout=5, + timeout=10, ) leaderboard_api_response.raise_for_status() @@ -214,7 +209,7 @@ def leaderboard_push_batch( leaderboard_api_response = requests.delete( leaderboard_version_delete_api_url, headers=headers, - timeout=5, + timeout=timeout, ) leaderboard_api_response.raise_for_status() diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 2faafec2..ff843373 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -323,5 +323,5 @@ ) -MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 20000 +MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 12000 MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60