From e43d18f7af5c6998800197b66e3dd150cbfb083e Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 25 Nov 2024 19:44:14 +0200 Subject: [PATCH 01/10] Add v3 state crawler flow. --- crawlers/mooncrawl/mooncrawl/data.py | 2 + .../mooncrawl/mooncrawl/state_crawler/cli.py | 238 ++++++++++++++++-- .../mooncrawl/mooncrawl/state_crawler/db.py | 36 ++- moonstreamapi/moonstreamapi/version.py | 2 +- 4 files changed, 242 insertions(+), 36 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index 3e76db46..d3030309 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -67,3 +67,5 @@ class ViewTasks(BaseModel): name: str outputs: List[Dict[str, Any]] address: str + customer_id: Optional[str] = None + instance_id: Optional[str] = None diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 13b994c6..80ec9444 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -8,6 +8,7 @@ from concurrent.futures._base import TimeoutError from pprint import pprint from typing import Any, Dict, List, Optional +import requests from uuid import UUID from moonstream.client import Moonstream # type: ignore @@ -17,7 +18,7 @@ from ..actions import recive_S3_data_from_query, get_all_entries_from_search from ..blockchain import connect from ..data import ViewTasks -from ..db import PrePing_SessionLocal +from ..db import PrePing_SessionLocal, create_moonstream_engine, sessionmaker from ..settings import ( bugout_client as bc, INFURA_PROJECT_ID, @@ -37,6 +38,45 @@ client = Moonstream() +def request_connection_string(costumer_id: str, token: str, instance_id: int) -> str: + """ + Request connection string from the Moonstream API. + """ + response = requests.get( + f"https://mdb-v3-api.moonstream.to/customers/{costumer_id}/instances/{instance_id}/creds/seer/url", + headers={"Authorization": f"Bearer {token}"}, + ) + + response.raise_for_status() + + return response.text + + +def fetch_customers_connections( + jobs: List[Dict[str, Any]], token: str +) -> Dict[str, Any]: + + connections = {} + + instance_id = 1 + + for job in jobs: + if job.get("customer_id") is not None: + if job["customer_id"] not in connections: + connections[job["customer_id"]] = {} + if job.get("instance_id") is not None: + instance_id = job["instance_id"] + else: + instance_id = 1 + connection_string = request_connection_string( + job["customer_id"], token, instance_id + ) + + connections[job["customer_id"]][instance_id] = connection_string + + return connections + + def execute_query(query: Dict[str, Any], token: str): """ Query task example: @@ -105,6 +145,7 @@ def make_multicall( calls: List[Any], block_timestamp: int, block_number: str = "latest", + block_hash: Optional[str] = None, ) -> Any: multicall_calls = [] @@ -142,6 +183,7 @@ def make_multicall( "call_data": multicall_calls[index][1], "block_number": block_number, "block_timestamp": block_timestamp, + "block_hash": block_hash, "status": encoded_data[0], } ) @@ -157,6 +199,7 @@ def make_multicall( "call_data": multicall_calls[index][1], "block_number": block_number, "block_timestamp": block_timestamp, + "block_hash": block_hash, "status": encoded_data[0], } ) @@ -200,6 +243,8 @@ def crawl_calls_level( block_timestamp, max_batch_size=3000, min_batch_size=4, + v3=False, + block_hash=None, ): calls_of_level = [] @@ -261,6 +306,7 @@ def crawl_calls_level( call_chunk, block_timestamp, block_number, + block_hash, ) make_multicall_result = future.result(timeout=20) retry = 0 @@ -288,8 +334,9 @@ def crawl_calls_level( logger.debug(f"Retry: {retry}") # results parsing and writing to database add_to_session_count = 0 + for result in make_multicall_result: - db_view = view_call_to_label(blockchain_type, result) + db_view = view_call_to_label(blockchain_type, result, v3) db_session.add(db_view) add_to_session_count += 1 @@ -310,6 +357,7 @@ def parse_jobs( batch_size: int, moonstream_token: str, web3_uri: Optional[str] = None, + v3: bool = False, ): """ Parse jobs from list and generate web3 interfaces for each contract. @@ -348,6 +396,7 @@ def parse_jobs( logger.info(f"Current block number: {block_number}") block_timestamp = web3_client.eth.get_block(block_number).timestamp # type: ignore + block_hash = web3_client.eth.get_block(block_number).hash # type: ignore multicaller = Multicall2( web3_client, web3_client.toChecksumAddress(multicall_contracts[blockchain_type]) @@ -464,8 +513,23 @@ def recursive_unpack(method_abi: Any, level: int = 0) -> Any: # reverse call_tree call_tree_levels = sorted(calls.keys(), reverse=True)[:-1] - - db_session = PrePing_SessionLocal() + customers_connections_sessions = {} + if v3: + customers_connections = fetch_customers_connections(jobs, moonstream_token) + for customer_id in customers_connections: + for instance_id in customers_connections[customer_id]: + ### Create engine for each customer + engine = create_moonstream_engine( + customers_connections[customer_id][instance_id], 10, 10000 + ) + session = sessionmaker(bind=engine) + try: + customers_connections_sessions[customer_id][instance_id] = session() + except Exception as e: + logger.error(f"Connection to {engine} failed: {e}") + continue + else: + db_session = PrePing_SessionLocal() # run crawling of levels try: @@ -474,36 +538,41 @@ def recursive_unpack(method_abi: Any, level: int = 0) -> Any: logger.info(f"call_tree_levels: {call_tree_levels}") batch_size = crawl_calls_level( - web3_client, - db_session, - calls[0], - responces, - contracts_ABIs, - interfaces, - batch_size, - multicall_method, - block_number, - blockchain_type, - block_timestamp, + web3_client=web3_client, + db_session=db_session, + customers_connections=customers_connections, + calls=calls[0], + responces=responces, + contracts_ABIs=contracts_ABIs, + interfaces=interfaces, + batch_size=batch_size, + multicall_method=multicall_method, + block_number=block_number, + blockchain_type=blockchain_type, + block_timestamp=block_timestamp, + v3=v3, + block_hash=block_hash, ) for level in call_tree_levels: logger.info(f"Crawl level: {level}. Jobs amount: {len(calls[level])}") batch_size = crawl_calls_level( - web3_client, - db_session, - calls[level], - responces, - contracts_ABIs, - interfaces, - batch_size, - multicall_method, - block_number, - blockchain_type, - block_timestamp, + web3_client=web3_client, + db_session=db_session, + customers_connections=customers_connections, + calls=calls[level], + responces=responces, + contracts_ABIs=contracts_ABIs, + interfaces=interfaces, + batch_size=batch_size, + multicall_method=multicall_method, + block_number=block_number, + blockchain_type=blockchain_type, + block_timestamp=block_timestamp, + v3=v3, + block_hash=block_hash, ) - finally: db_session.close() @@ -579,6 +648,78 @@ def handle_crawl(args: argparse.Namespace) -> None: ) +def handle_crawl_v3(args: argparse.Namespace) -> None: + """ + Ability to track states of the contracts. + + Read all view methods of the contracts and crawl + """ + + blockchain_type = AvailableBlockchainType(args.blockchain) + + if args.jobs_file is not None: + with open(args.jobs_file, "r") as f: + jobs = json.load(f) + + else: + + logger.info("Reading jobs from the journal") + + jobs = [] + + # Bugout + query = f"#state_job #blockchain:{blockchain_type.value}" + + existing_jobs = get_all_entries_from_search( + journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, + search_query=query, + limit=1000, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + content=True, + ) + + if len(existing_jobs) == 0: + logger.info("No jobs found in the journal") + return + + for job in existing_jobs: + + try: + if job.content is None: + logger.error(f"Job content is None for entry {job.entry_url}") + continue + ### parse json + job_content = json.loads(job.content) + ### validate via ViewTasks + ViewTasks(**job_content) + jobs.append(job_content) + except Exception as e: + + logger.error(f"Job validation of entry {job.entry_url} failed: {e}") + continue + + custom_web3_provider = args.web3_uri + + if args.infura and INFURA_PROJECT_ID is not None: + if blockchain_type not in infura_networks: + raise ValueError( + f"Infura is not supported for {blockchain_type} blockchain type" + ) + logger.info(f"Using Infura!") + custom_web3_provider = infura_networks[blockchain_type]["url"] + + parse_jobs( + jobs, + blockchain_type, + custom_web3_provider, + args.block_number, + args.batch_size, + args.moonstream_token, + args.web3_uri, + True, + ) + + def parse_abi(args: argparse.Namespace) -> None: """ Parse the abi of the contract and save it to the database. @@ -819,6 +960,49 @@ def main() -> None: ) generate_view_parser.set_defaults(func=parse_abi) + generate_view_parser = subparsers.add_parser( + "crawl-jobs-v3", + help="continuous crawling the view methods from job structure", + ) + + generate_view_parser.add_argument( + "--moonstream-token", + "-t", + type=str, + help="Moonstream token", + required=True, + ) + generate_view_parser.add_argument( + "--blockchain", + "-b", + type=str, + help="Type of blovkchain wich writng in database", + required=True, + ) + generate_view_parser.add_argument( + "--infura", + action="store_true", + help="Use infura as web3 provider", + ) + generate_view_parser.add_argument( + "--block-number", "-N", type=str, help="Block number." + ) + generate_view_parser.add_argument( + "--jobs-file", + "-j", + type=str, + help="Path to json file with jobs", + required=False, + ) + generate_view_parser.add_argument( + "--batch-size", + "-s", + type=int, + default=500, + help="Size of chunks wich send to Multicall2 contract.", + ) + generate_view_parser.set_defaults(func=handle_crawl_v3) + args = parser.parse_args() args.func(args) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/db.py b/crawlers/mooncrawl/mooncrawl/state_crawler/db.py index 3682fdc7..a0451436 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/db.py @@ -14,6 +14,7 @@ def view_call_to_label( blockchain_type: AvailableBlockchainType, call: Dict[str, Any], + v3: bool = False, label_name=VIEW_STATE_CRAWLER_LABEL, ): """ @@ -35,14 +36,33 @@ def view_call_to_label( ).replace(r"\u0000", "") ) - label = label_model( - label=label_name, - label_data=sanityzed_label_data, - address=call["address"], - block_number=call["block_number"], - transaction_hash=None, - block_timestamp=call["block_timestamp"], - ) + if v3: + + del sanityzed_label_data["type"] + del sanityzed_label_data["name"] + + label = label_model( + label=label_name, + label_name=call["name"], + label_type="view", + label_data=sanityzed_label_data, + address=call["address"], + block_number=call["block_number"], + transaction_hash="0x", + block_timestamp=call["block_timestamp"], + block_hash=call["block_hash"], + ) + + else: + + label = label_model( + label=label_name, + label_data=sanityzed_label_data, + address=call["address"], + block_number=call["block_number"], + transaction_hash=None, + block_timestamp=call["block_timestamp"], + ) return label diff --git a/moonstreamapi/moonstreamapi/version.py b/moonstreamapi/moonstreamapi/version.py index 304be8c8..932ff953 100644 --- a/moonstreamapi/moonstreamapi/version.py +++ b/moonstreamapi/moonstreamapi/version.py @@ -2,4 +2,4 @@ Moonstream library and API version. """ -MOONSTREAMAPI_VERSION = "0.4.10" +MOONSTREAMAPI_VERSION = "0.4.11" From 3f2e94ef746966999ce2af7da0ff25d6dcc51eb0 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 26 Nov 2024 15:07:57 +0200 Subject: [PATCH 02/10] Update state craler code. --- crawlers/mooncrawl/mooncrawl/data.py | 1 + .../mooncrawl/mooncrawl/state_crawler/cli.py | 857 +++++++++--------- .../mooncrawl/mooncrawl/state_crawler/db.py | 64 +- 3 files changed, 495 insertions(+), 427 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index d3030309..4d1918a3 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -69,3 +69,4 @@ class ViewTasks(BaseModel): address: str customer_id: Optional[str] = None instance_id: Optional[str] = None + v3: Optional[bool] = False diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 80ec9444..3dd10492 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -10,6 +10,7 @@ from typing import Any, Dict, List, Optional import requests from uuid import UUID +from web3 import Web3 from moonstream.client import Moonstream # type: ignore from moonstreamtypes.blockchain import AvailableBlockchainType @@ -38,12 +39,12 @@ client = Moonstream() -def request_connection_string(costumer_id: str, token: str, instance_id: int) -> str: +def request_connection_string(customer_id: str, instance_id: int, token: str) -> str: """ Request connection string from the Moonstream API. """ response = requests.get( - f"https://mdb-v3-api.moonstream.to/customers/{costumer_id}/instances/{instance_id}/creds/seer/url", + f"https://mdb-v3-api.moonstream.to/customers/{customer_id}/instances/{instance_id}/creds/seer/url", headers={"Authorization": f"Bearer {token}"}, ) @@ -52,31 +53,6 @@ def request_connection_string(costumer_id: str, token: str, instance_id: int) -> return response.text -def fetch_customers_connections( - jobs: List[Dict[str, Any]], token: str -) -> Dict[str, Any]: - - connections = {} - - instance_id = 1 - - for job in jobs: - if job.get("customer_id") is not None: - if job["customer_id"] not in connections: - connections[job["customer_id"]] = {} - if job.get("instance_id") is not None: - instance_id = job["instance_id"] - else: - instance_id = 1 - connection_string = request_connection_string( - job["customer_id"], token, instance_id - ) - - connections[job["customer_id"]][instance_id] = connection_string - - return connections - - def execute_query(query: Dict[str, Any], token: str): """ Query task example: @@ -140,141 +116,143 @@ def execute_query(query: Dict[str, Any], token: str): return result -def make_multicall( - multicall_method: Any, - calls: List[Any], - block_timestamp: int, - block_number: str = "latest", - block_hash: Optional[str] = None, -) -> Any: +def encode_calls(calls: List[Dict[str, Any]]) -> List[tuple]: + """Encodes the call data for multicall.""" multicall_calls = [] - for call in calls: try: - multicall_calls.append( - ( - call["address"], - call["method"].encode_data(call["inputs"]).hex(), - ) - ) + encoded_data = call["method"].encode_data(call["inputs"]).hex() + multicall_calls.append((call["address"], encoded_data)) except Exception as e: logger.error( - f'Error encoding data for method {call["method"].name} call: {call}' + f'Error encoding data for method {call["method"].name} call: {call}. Error: {e}' ) + return multicall_calls - multicall_result = multicall_method(False, calls=multicall_calls).call( - block_identifier=block_number + +def perform_multicall( + multicall_method: Any, multicall_calls: List[tuple], block_identifier: str +) -> Any: + """Performs the multicall and returns the result.""" + return multicall_method(False, calls=multicall_calls).call( + block_identifier=block_identifier ) - results = [] - # Handle the case with not successful calls +def process_multicall_result( + calls: List[Dict[str, Any]], + multicall_result: Any, + multicall_calls: List[tuple], + block_timestamp: int, + block_number: str, + block_hash: Optional[str], +) -> List[Dict[str, Any]]: + """Processes the multicall result and decodes the data.""" + results = [] for index, encoded_data in enumerate(multicall_result): + call = calls[index] try: - if encoded_data[0]: - results.append( - { - "result": calls[index]["method"].decode_data(encoded_data[1]), - "hash": calls[index]["hash"], - "method": calls[index]["method"], - "address": calls[index]["address"], - "name": calls[index]["method"].name, - "inputs": calls[index]["inputs"], - "call_data": multicall_calls[index][1], - "block_number": block_number, - "block_timestamp": block_timestamp, - "block_hash": block_hash, - "status": encoded_data[0], - } - ) - else: - results.append( - { - "result": calls[index]["method"].decode_data(encoded_data[1]), - "hash": calls[index]["hash"], - "method": calls[index]["method"], - "address": calls[index]["address"], - "name": calls[index]["method"].name, - "inputs": calls[index]["inputs"], - "call_data": multicall_calls[index][1], - "block_number": block_number, - "block_timestamp": block_timestamp, - "block_hash": block_hash, - "status": encoded_data[0], - } - ) + result_data = call["method"].decode_data(encoded_data[1]) + result = { + "result": result_data, + "hash": call["hash"], + "method": call["method"], + "address": call["address"], + "name": call["method"].name, + "inputs": call["inputs"], + "call_data": multicall_calls[index][1], + "block_number": block_number, + "block_timestamp": block_timestamp, + "block_hash": block_hash, + "status": encoded_data[0], + "v3": call.get("v3", False), + "customer_id": call.get("customer_id"), + "instance_id": call.get("instance_id"), + } + results.append(result) except Exception as e: - results.append( - { - "result": str(encoded_data[1]), - "hash": calls[index]["hash"], - "method": calls[index]["method"], - "address": calls[index]["address"], - "name": calls[index]["method"].name, - "inputs": calls[index]["inputs"], - "call_data": multicall_calls[index][1], - "block_number": block_number, - "block_timestamp": block_timestamp, - "status": encoded_data[0], - "error": str(e), - } - ) - + result = { + "result": str(encoded_data[1]), + "hash": call["hash"], + "method": call["method"], + "address": call["address"], + "name": call["method"].name, + "inputs": call["inputs"], + "call_data": multicall_calls[index][1], + "block_number": block_number, + "block_timestamp": block_timestamp, + "status": encoded_data[0], + "error": str(e), + "v3": call.get("v3", False), + "customer_id": call.get("customer_id"), + "instance_id": call.get("instance_id"), + } + results.append(result) logger.error( - f"Error decoding data for for method {call['method'].name} call {calls[index]}: {e}." + f"Error decoding data for method {call['method'].name} call {call}: {e}." ) - # data is not decoded, return the encoded data logger.error(f"Encoded data: {encoded_data}") + return results + +def make_multicall( + multicall_method: Any, + calls: List[Dict[str, Any]], + block_timestamp: int, + block_number: str = "latest", + block_hash: Optional[str] = None, +) -> List[Dict[str, Any]]: + """Makes a multicall to the blockchain and processes the results.""" + multicall_calls = encode_calls(calls) + # breakpoint() + multicall_result = perform_multicall( + multicall_method, multicall_calls, block_number + ) + results = process_multicall_result( + calls, + multicall_result, + multicall_calls, + block_timestamp, + block_number, + block_hash, + ) return results -def crawl_calls_level( - web3_client, - db_session, - calls, - responces, - contracts_ABIs, - interfaces, - batch_size, - multicall_method, - block_number, - blockchain_type, - block_timestamp, - max_batch_size=3000, - min_batch_size=4, - v3=False, - block_hash=None, -): +def generate_calls_of_level( + calls: List[Dict[str, Any]], + responses: Dict[str, Any], + contracts_ABIs: Dict[str, Any], + interfaces: Dict[str, Any], +) -> List[Dict[str, Any]]: + """Generates the calls for the current level.""" calls_of_level = [] - for call in calls: - if call["generated_hash"] in responces: + if call["generated_hash"] in responses: continue parameters = [] - for input in call["inputs"]: - if type(input["value"]) in (str, int): - if input["value"] not in responces: + if isinstance(input["value"], (str, int)): + if input["value"] not in responses: parameters.append([input["value"]]) else: - if input["value"] in contracts_ABIs[call["address"]] and ( - contracts_ABIs[call["address"]][input["value"]]["name"] + if ( + input["value"] in contracts_ABIs[call["address"]] + and contracts_ABIs[call["address"]][input["value"]]["name"] == "totalSupply" - ): # hack for totalSupply TODO(Andrey): need add propper support for response parsing + ): + # Hack for totalSupply parameters.append( - list(range(1, responces[input["value"]][0][0] + 1)) + list(range(1, responses[input["value"]][0][0] + 1)) ) else: - parameters.append(responces[input["value"]]) - elif type(input["value"]) == list: + parameters.append(responses[input["value"]]) + elif isinstance(input["value"], list): parameters.append(input["value"]) else: - raise - + raise Exception("Unknown input value type") for call_parameters in itertools.product(*parameters): - # hack for tuples product - if len(call_parameters) == 1 and type(call_parameters[0]) == tuple: + if len(call_parameters) == 1 and isinstance(call_parameters[0], tuple): call_parameters = call_parameters[0] calls_of_level.append( { @@ -284,21 +262,76 @@ def crawl_calls_level( ), "hash": call["generated_hash"], "inputs": call_parameters, + "v3": call.get("v3", False), + "customer_id": call.get("customer_id"), + "instance_id": call.get("instance_id"), } ) + return calls_of_level + + +def process_results( + make_multicall_result: List[Dict[str, Any]], + db_sessions: Dict[Any, Any], + responses: Dict[str, Any], + blockchain_type: Any, +) -> int: + """Processes the results and adds them to the appropriate database sessions.""" + add_to_session_count = 0 + sessions_to_commit = set() + for result in make_multicall_result: + v3 = result.get("v3", False) + if v3: + customer_id = result.get("customer_id") + instance_id = result.get("instance_id") + db_session = db_sessions.get((customer_id, instance_id)) + else: + db_session = db_sessions.get("v2") + if db_session is None: + logger.error(f"No db_session found for result {result}") + continue + db_view = view_call_to_label(blockchain_type, result, v3) + db_session.add(db_view) + sessions_to_commit.add(db_session) + add_to_session_count += 1 + if result["hash"] not in responses: + responses[result["hash"]] = [] + responses[result["hash"]].append(result["result"]) + # Commit all sessions + for session in sessions_to_commit: + commit_session(session) + logger.info(f"{add_to_session_count} labels committed to database.") + return add_to_session_count - retry = 0 +def crawl_calls_level( + web3_client: Web3, + db_sessions: Dict[Any, Any], + calls: List[Dict[str, Any]], + responses: Dict[str, Any], + contracts_ABIs: Dict[str, Any], + interfaces: Dict[str, Any], + batch_size: int, + multicall_method: Any, + block_number: str, + blockchain_type: Any, + block_timestamp: int, + max_batch_size: int = 3000, + min_batch_size: int = 4, + block_hash: Optional[str] = None, +) -> int: + """Crawls calls at a specific level.""" + calls_of_level = generate_calls_of_level( + calls, responses, contracts_ABIs, interfaces + ) + retry = 0 while len(calls_of_level) > 0: make_multicall_result = [] try: call_chunk = calls_of_level[:batch_size] - logger.info( f"Calling multicall2 with {len(call_chunk)} calls at block {block_number}" ) - - # 1 thead with timeout for hung multicall calls with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit( make_multicall, @@ -311,344 +344,315 @@ def crawl_calls_level( make_multicall_result = future.result(timeout=20) retry = 0 calls_of_level = calls_of_level[batch_size:] - logger.info(f"lenght of task left {len(calls_of_level)}.") + logger.info(f"Length of tasks left: {len(calls_of_level)}.") batch_size = min(batch_size * 2, max_batch_size) - except ValueError as e: # missing trie node + except ValueError as e: logger.error(f"ValueError: {e}, retrying") retry += 1 if "missing trie node" in str(e): time.sleep(4) if retry > 5: - raise (e) + raise e batch_size = max(batch_size // 4, min_batch_size) - except TimeoutError as e: # timeout + except TimeoutError as e: logger.error(f"TimeoutError: {e}, retrying") retry += 1 if retry > 5: - raise (e) + raise e batch_size = max(batch_size // 3, min_batch_size) except Exception as e: logger.error(f"Exception: {e}") - raise (e) + raise e time.sleep(2) logger.debug(f"Retry: {retry}") - # results parsing and writing to database - add_to_session_count = 0 - - for result in make_multicall_result: - db_view = view_call_to_label(blockchain_type, result, v3) - db_session.add(db_view) - add_to_session_count += 1 - - if result["hash"] not in responces: - responces[result["hash"]] = [] - responces[result["hash"]].append(result["result"]) - commit_session(db_session) - logger.info(f"{add_to_session_count} labels commit to database.") - + process_results(make_multicall_result, db_sessions, responses, blockchain_type) return batch_size -def parse_jobs( - jobs: List[Any], - blockchain_type: AvailableBlockchainType, +def connect_to_web3( + blockchain_type: Any, web3_provider_uri: Optional[str], - block_number: Optional[int], - batch_size: int, - moonstream_token: str, - web3_uri: Optional[str] = None, - v3: bool = False, -): - """ - Parse jobs from list and generate web3 interfaces for each contract. - """ - - contracts_ABIs: Dict[str, Any] = {} - contracts_methods: Dict[str, Any] = {} - calls: Dict[int, Any] = {0: []} - responces: Dict[str, Any] = {} - + web3_uri: Optional[str], +) -> Web3: + """Connects to the Web3 client.""" if web3_provider_uri is not None: try: logger.info( f"Connecting to blockchain: {blockchain_type} with custom provider!" ) - web3_client = connect( blockchain_type=blockchain_type, web3_uri=web3_provider_uri ) except Exception as e: logger.error( - f"Web3 connection to custom provider {web3_provider_uri} failed error: {e}" + f"Web3 connection to custom provider {web3_provider_uri} failed. Error: {e}" ) - raise (e) + raise e else: - logger.info(f"Connecting to blockchain: {blockchain_type} with Node balancer.") + logger.info(f"Connecting to blockchain: {blockchain_type} with node balancer.") web3_client = _retry_connect_web3( blockchain_type=blockchain_type, web3_uri=web3_uri ) - logger.info(f"Crawler started connected to blockchain: {blockchain_type}") + return web3_client + +def get_block_info(web3_client: Web3, block_number: Optional[int]) -> tuple: + """Retrieves block information.""" if block_number is None: block_number = web3_client.eth.get_block("latest").number # type: ignore - logger.info(f"Current block number: {block_number}") - - block_timestamp = web3_client.eth.get_block(block_number).timestamp # type: ignore - block_hash = web3_client.eth.get_block(block_number).hash # type: ignore - - multicaller = Multicall2( - web3_client, web3_client.toChecksumAddress(multicall_contracts[blockchain_type]) - ) - - multicall_method = multicaller.tryAggregate - - def recursive_unpack(method_abi: Any, level: int = 0) -> Any: - """ - Generate tree of calls for crawling - """ - have_subcalls = False - - ### we add queryAPI to that tree - - if method_abi["type"] == "queryAPI": - # make queryAPI call - - responce = execute_query(method_abi, token=moonstream_token) - - # generate hash for queryAPI call - - generated_hash = hashlib.md5( - json.dumps( - method_abi, - sort_keys=True, - indent=4, - separators=(",", ": "), - ).encode("utf-8") - ).hexdigest() - - # add responce to responces - - responces[generated_hash] = responce - - return generated_hash - - abi = { - "inputs": [], - "outputs": method_abi["outputs"], - "name": method_abi["name"], - "type": "function", - "stateMutability": "view", - } - - for input in method_abi["inputs"]: - if type(input["value"]) in (int, list): - abi["inputs"].append(input) - - elif type(input["value"]) == str: - abi["inputs"].append(input) - - elif type(input["value"]) == dict: - if input["value"]["type"] == "function": - hash_link = recursive_unpack(input["value"], level + 1) - # replace defenition by hash pointing to the result of the recursive_unpack - input["value"] = hash_link - have_subcalls = True - elif input["value"]["type"] == "queryAPI": - input["value"] = recursive_unpack(input["value"], level + 1) - have_subcalls = True - abi["inputs"].append(input) - abi["address"] = method_abi["address"] + block = web3_client.eth.get_block(block_number) # type: ignore + block_timestamp = block.timestamp # type: ignore + block_hash = block.hash.hex() # type: ignore + return block_number, block_timestamp, block_hash + + +def recursive_unpack( + method_abi: Any, + level: int, + calls: Dict[int, List[Any]], + contracts_methods: Dict[str, Any], + contracts_ABIs: Dict[str, Any], + responses: Dict[str, Any], + moonstream_token: str, + v3: bool, + customer_id: Optional[str] = None, + instance_id: Optional[str] = None, +) -> str: + """Recursively unpacks method ABIs to generate a tree of calls.""" + have_subcalls = False + if method_abi["type"] == "queryAPI": + # Make queryAPI call + response = execute_query(method_abi, token=moonstream_token) + # Generate hash for queryAPI call generated_hash = hashlib.md5( - json.dumps(abi, sort_keys=True, indent=4, separators=(",", ": ")).encode( - "utf-8" - ) + json.dumps( + method_abi, + sort_keys=True, + indent=4, + separators=(",", ": "), + ).encode("utf-8") ).hexdigest() + # Add response to responses + responses[generated_hash] = response + return generated_hash - abi["generated_hash"] = generated_hash - if have_subcalls: - level += 1 - if not calls.get(level): - calls[level] = [] - calls[level].append(abi) - else: - level = 0 + abi = { + "inputs": [], + "outputs": method_abi["outputs"], + "name": method_abi["name"], + "type": "function", + "stateMutability": "view", + "v3": v3, + "customer_id": customer_id, + "instance_id": instance_id, + } - if not calls.get(level): - calls[level] = [] - calls[level].append(abi) + for input in method_abi["inputs"]: + if isinstance(input["value"], (int, list, str)): + abi["inputs"].append(input) + elif isinstance(input["value"], dict): + if input["value"]["type"] in ["function", "queryAPI"]: + hash_link = recursive_unpack( + input["value"], + level + 1, + calls, + contracts_methods, + contracts_ABIs, + responses, + moonstream_token, + v3, + customer_id, + instance_id, + ) + input["value"] = hash_link + have_subcalls = True + abi["inputs"].append(input) + + abi["address"] = method_abi["address"] + generated_hash = hashlib.md5( + json.dumps(abi, sort_keys=True, indent=4, separators=(",", ": ")).encode( + "utf-8" + ) + ).hexdigest() + abi["generated_hash"] = generated_hash - if not contracts_methods.get(job["address"]): - contracts_methods[job["address"]] = [] - if generated_hash not in contracts_methods[job["address"]]: - contracts_methods[job["address"]].append(generated_hash) - if not contracts_ABIs.get(job["address"]): - contracts_ABIs[job["address"]] = {} - contracts_ABIs[job["address"]][generated_hash] = abi + if have_subcalls: + level += 1 + calls.setdefault(level, []).append(abi) + else: + level = 0 + calls.setdefault(level, []).append(abi) - return generated_hash + # if not contracts_methods.get(job["address"]): + # contracts_methods[job["address"]] = [] + # if generated_hash not in contracts_methods[job["address"]]: + # contracts_methods[job["address"]].append(generated_hash) + # if not contracts_ABIs.get(job["address"]): + # contracts_ABIs[job["address"]] = {} + # contracts_ABIs[job["address"]][generated_hash] = abi - for job in jobs: - if job["address"] not in contracts_ABIs: - contracts_ABIs[job["address"]] = [] + contracts_methods.setdefault(method_abi["address"], []) + if generated_hash not in contracts_methods[method_abi["address"]]: + contracts_methods[method_abi["address"]].append(generated_hash) + contracts_ABIs.setdefault(method_abi["address"], {}) + contracts_ABIs[method_abi["address"]][generated_hash] = abi - recursive_unpack(job, 0) + return generated_hash - # generate contracts interfaces +def build_interfaces( + contracts_ABIs: Dict[str, Any], contracts_methods: Dict[str, Any], web3_client: Web3 +) -> Dict[str, Any]: + """Builds contract interfaces.""" interfaces = {} - for contract_address in contracts_ABIs: - # collect abis for each contract - abis = [] - - for method_hash in contracts_methods[contract_address]: - abis.append(contracts_ABIs[contract_address][method_hash]) - - # generate interface + abis = [ + contracts_ABIs[contract_address][method_hash] + for method_hash in contracts_methods[contract_address] + ] interfaces[contract_address] = web3_client.eth.contract( address=web3_client.toChecksumAddress(contract_address), abi=abis ) + return interfaces - # reverse call_tree - call_tree_levels = sorted(calls.keys(), reverse=True)[:-1] - customers_connections_sessions = {} - if v3: - customers_connections = fetch_customers_connections(jobs, moonstream_token) - for customer_id in customers_connections: - for instance_id in customers_connections[customer_id]: - ### Create engine for each customer - engine = create_moonstream_engine( - customers_connections[customer_id][instance_id], 10, 10000 - ) - session = sessionmaker(bind=engine) - try: - customers_connections_sessions[customer_id][instance_id] = session() - except Exception as e: - logger.error(f"Connection to {engine} failed: {e}") - continue - else: - db_session = PrePing_SessionLocal() - # run crawling of levels +def parse_jobs( + jobs: List[Any], + blockchain_type: Any, + web3_provider_uri: Optional[str], + block_number: Optional[int], + batch_size: int, + moonstream_token: str, + web3_uri: Optional[str] = None, + customer_db_uri: Optional[str] = None, +): + """ + Parses jobs from a list and generates web3 interfaces for each contract. + """ + contracts_ABIs: Dict[str, Any] = {} + contracts_methods: Dict[str, Any] = {} + calls: Dict[int, List[Any]] = {0: []} + responses: Dict[str, Any] = {} + db_sessions: Dict[Any, Any] = {} + + web3_client = connect_to_web3(blockchain_type, web3_provider_uri, web3_uri) + block_number, block_timestamp, block_hash = get_block_info( + web3_client, block_number + ) + + multicaller = Multicall2( + web3_client, web3_client.toChecksumAddress(multicall_contracts[blockchain_type]) + ) + multicall_method = multicaller.tryAggregate + + # All sessions are stored in the dictionary db_sessions + # Under one try block try: - # initial call of level 0 all call without subcalls directly moved there + # Process jobs and create sessions + for job in jobs: + v3 = job.get("v3", False) + customer_id = job.get("customer_id") + instance_id = job.get("instance_id") + if customer_db_uri is not None: + if v3 and (customer_id, instance_id) not in db_sessions: + # Create session + engine = create_moonstream_engine(customer_db_uri, 2, 100000) + session = sessionmaker(bind=engine) + try: + db_sessions[(customer_id, instance_id)] = session() + except Exception as e: + logger.error(f"Connection to {engine} failed: {e}") + continue + else: + if "v2" not in db_sessions: + engine = create_moonstream_engine(customer_db_uri, 2, 100000) + db_sessions["v2"] = sessionmaker(bind=engine)() + elif v3: + if (customer_id, instance_id) not in db_sessions: + # Create session + # Assume fetch_connection_string fetches the connection string + connection_string = request_connection_string( + customer_id=customer_id, + instance_id=instance_id, + token=moonstream_token, + ) + engine = create_moonstream_engine(connection_string, 2, 100000) + session = sessionmaker(bind=engine) + try: + db_sessions[(customer_id, instance_id)] = session() + except Exception as e: + logger.error(f"Connection to {engine} failed: {e}") + continue + else: + if "v2" not in db_sessions: + db_sessions["v2"] = PrePing_SessionLocal() + + if job["address"] not in contracts_ABIs: + contracts_ABIs[job["address"]] = {} + + recursive_unpack( + job, + 0, + calls, + contracts_methods, + contracts_ABIs, + responses, + moonstream_token, + v3, + customer_id, + instance_id, + ) + + interfaces = build_interfaces(contracts_ABIs, contracts_methods, web3_client) + + call_tree_levels = sorted(calls.keys(), reverse=True)[:-1] + logger.info(f"Crawl level: 0. Jobs amount: {len(calls[0])}") - logger.info(f"call_tree_levels: {call_tree_levels}") + logger.info(f"Call tree levels: {call_tree_levels}") batch_size = crawl_calls_level( web3_client=web3_client, - db_session=db_session, - customers_connections=customers_connections, + db_sessions=db_sessions, calls=calls[0], - responces=responces, + responses=responses, contracts_ABIs=contracts_ABIs, interfaces=interfaces, batch_size=batch_size, multicall_method=multicall_method, - block_number=block_number, + block_number=block_number, # type: ignore blockchain_type=blockchain_type, block_timestamp=block_timestamp, - v3=v3, block_hash=block_hash, ) for level in call_tree_levels: logger.info(f"Crawl level: {level}. Jobs amount: {len(calls[level])}") - batch_size = crawl_calls_level( web3_client=web3_client, - db_session=db_session, - customers_connections=customers_connections, + db_sessions=db_sessions, calls=calls[level], - responces=responces, + responses=responses, contracts_ABIs=contracts_ABIs, interfaces=interfaces, batch_size=batch_size, multicall_method=multicall_method, - block_number=block_number, + block_number=block_number, # type: ignore blockchain_type=blockchain_type, block_timestamp=block_timestamp, - v3=v3, block_hash=block_hash, ) finally: - db_session.close() - - -def handle_crawl(args: argparse.Namespace) -> None: - """ - Ability to track states of the contracts. - - Read all view methods of the contracts and crawl - """ - - blockchain_type = AvailableBlockchainType(args.blockchain) - - if args.jobs_file is not None: - with open(args.jobs_file, "r") as f: - jobs = json.load(f) - - else: - - logger.info("Reading jobs from the journal") - - jobs = [] - - # Bugout - query = f"#state_job #blockchain:{blockchain_type.value}" - - existing_jobs = get_all_entries_from_search( - journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, - search_query=query, - limit=1000, - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - content=True, - ) - - if len(existing_jobs) == 0: - logger.info("No jobs found in the journal") - return - - for job in existing_jobs: - + # Close all sessions + for session in db_sessions.values(): try: - if job.content is None: - logger.error(f"Job content is None for entry {job.entry_url}") - continue - ### parse json - job_content = json.loads(job.content) - ### validate via ViewTasks - ViewTasks(**job_content) - jobs.append(job_content) + session.close() except Exception as e: + logger.error(f"Failed to close session: {e}") - logger.error(f"Job validation of entry {job.entry_url} failed: {e}") - continue - - custom_web3_provider = args.web3_uri - - if args.infura and INFURA_PROJECT_ID is not None: - if blockchain_type not in infura_networks: - raise ValueError( - f"Infura is not supported for {blockchain_type} blockchain type" - ) - logger.info(f"Using Infura!") - custom_web3_provider = infura_networks[blockchain_type]["url"] - parse_jobs( - jobs, - blockchain_type, - custom_web3_provider, - args.block_number, - args.batch_size, - args.moonstream_token, - args.web3_uri, - ) - - -def handle_crawl_v3(args: argparse.Namespace) -> None: +def handle_crawl(args: argparse.Namespace) -> None: """ Ability to track states of the contracts. @@ -716,7 +720,7 @@ def handle_crawl_v3(args: argparse.Namespace) -> None: args.batch_size, args.moonstream_token, args.web3_uri, - True, + args.customer_db_uri, ) @@ -903,6 +907,11 @@ def main() -> None: default=500, help="Size of chunks wich send to Multicall2 contract.", ) + view_state_crawler_parser.add_argument( + "--customer-db-uri", + type=str, + help="URI for the customer database", + ) view_state_crawler_parser.set_defaults(func=handle_crawl) view_state_migration_parser = subparsers.add_parser( @@ -960,48 +969,48 @@ def main() -> None: ) generate_view_parser.set_defaults(func=parse_abi) - generate_view_parser = subparsers.add_parser( - "crawl-jobs-v3", - help="continuous crawling the view methods from job structure", - ) - - generate_view_parser.add_argument( - "--moonstream-token", - "-t", - type=str, - help="Moonstream token", - required=True, - ) - generate_view_parser.add_argument( - "--blockchain", - "-b", - type=str, - help="Type of blovkchain wich writng in database", - required=True, - ) - generate_view_parser.add_argument( - "--infura", - action="store_true", - help="Use infura as web3 provider", - ) - generate_view_parser.add_argument( - "--block-number", "-N", type=str, help="Block number." - ) - generate_view_parser.add_argument( - "--jobs-file", - "-j", - type=str, - help="Path to json file with jobs", - required=False, - ) - generate_view_parser.add_argument( - "--batch-size", - "-s", - type=int, - default=500, - help="Size of chunks wich send to Multicall2 contract.", - ) - generate_view_parser.set_defaults(func=handle_crawl_v3) + # generate_view_parser = subparsers.add_parser( + # "crawl-jobs-v3", + # help="continuous crawling the view methods from job structure", + # ) + + # generate_view_parser.add_argument( + # "--moonstream-token", + # "-t", + # type=str, + # help="Moonstream token", + # required=True, + # ) + # generate_view_parser.add_argument( + # "--blockchain", + # "-b", + # type=str, + # help="Type of blovkchain wich writng in database", + # required=True, + # ) + # generate_view_parser.add_argument( + # "--infura", + # action="store_true", + # help="Use infura as web3 provider", + # ) + # generate_view_parser.add_argument( + # "--block-number", "-N", type=str, help="Block number." + # ) + # generate_view_parser.add_argument( + # "--jobs-file", + # "-j", + # type=str, + # help="Path to json file with jobs", + # required=False, + # ) + # generate_view_parser.add_argument( + # "--batch-size", + # "-s", + # type=int, + # default=500, + # help="Size of chunks wich send to Multicall2 contract.", + # ) + # generate_view_parser.set_defaults(func=handle_crawl_v3) args = parser.parse_args() args.func(args) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/db.py b/crawlers/mooncrawl/mooncrawl/state_crawler/db.py index a0451436..b2349bc9 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/db.py @@ -1,6 +1,8 @@ import json import logging from typing import Any, Dict +from hexbytes import HexBytes + from moonstreamtypes.blockchain import AvailableBlockchainType, get_label_model from sqlalchemy.orm import Session @@ -21,7 +23,8 @@ def view_call_to_label( Creates a label model. """ - label_model = get_label_model(blockchain_type) + version = 3 if v3 else 2 + label_model = get_label_model(blockchain_type, version=version) sanityzed_label_data = json.loads( json.dumps( @@ -41,14 +44,69 @@ def view_call_to_label( del sanityzed_label_data["type"] del sanityzed_label_data["name"] + # class EvmBasedLabel(Base): # type: ignore + # __abstract__ = True + + # id = Column( + # UUID(as_uuid=True), + # primary_key=True, + # default=uuid.uuid4, + # unique=True, + # nullable=False, + # ) + # label = Column(VARCHAR(256), nullable=False, index=True) + + # transaction_hash = Column( + # VARCHAR(128), + # nullable=False, + # index=True, + # ) + # log_index = Column(Integer, nullable=True) + + # block_number = Column( + # BigInteger, + # nullable=False, + # index=True, + # ) + # block_hash = Column(VARCHAR(256), nullable=False) + # block_timestamp = Column(BigInteger, nullable=False) + + # caller_address = Column( + # LargeBinary, + # nullable=True, + # index=True, + # ) + # origin_address = Column( + # LargeBinary, + # nullable=True, + # index=True, + # ) + + # address = Column( + # LargeBinary, + # nullable=False, + # index=True, + # ) + + # label_name = Column(Text, nullable=True, index=True) + # label_type = Column(VARCHAR(64), nullable=True, index=True) + # label_data = Column(JSONB, nullable=True) + + # created_at = Column( + # DateTime(timezone=True), server_default=utcnow(), nullable=False + # ) + + ## add zero transaction hash + label = label_model( label=label_name, label_name=call["name"], label_type="view", label_data=sanityzed_label_data, - address=call["address"], + ### bytea + address=HexBytes(call["address"]), block_number=call["block_number"], - transaction_hash="0x", + transaction_hash="0x2653135e31407726a25dd8d304878578cdfcc7d69a2b319d1aca4a37ed66956a", block_timestamp=call["block_timestamp"], block_hash=call["block_hash"], ) From a558a15929b9ee5c35598dd0f674bc57f20bfbd3 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 26 Nov 2024 17:05:50 +0200 Subject: [PATCH 03/10] Update version. --- .../mooncrawl/mooncrawl/state_crawler/cli.py | 51 ------------------- crawlers/mooncrawl/mooncrawl/version.py | 2 +- 2 files changed, 1 insertion(+), 52 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 3dd10492..f244b577 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -485,14 +485,6 @@ def recursive_unpack( level = 0 calls.setdefault(level, []).append(abi) - # if not contracts_methods.get(job["address"]): - # contracts_methods[job["address"]] = [] - # if generated_hash not in contracts_methods[job["address"]]: - # contracts_methods[job["address"]].append(generated_hash) - # if not contracts_ABIs.get(job["address"]): - # contracts_ABIs[job["address"]] = {} - # contracts_ABIs[job["address"]][generated_hash] = abi - contracts_methods.setdefault(method_abi["address"], []) if generated_hash not in contracts_methods[method_abi["address"]]: contracts_methods[method_abi["address"]].append(generated_hash) @@ -969,49 +961,6 @@ def main() -> None: ) generate_view_parser.set_defaults(func=parse_abi) - # generate_view_parser = subparsers.add_parser( - # "crawl-jobs-v3", - # help="continuous crawling the view methods from job structure", - # ) - - # generate_view_parser.add_argument( - # "--moonstream-token", - # "-t", - # type=str, - # help="Moonstream token", - # required=True, - # ) - # generate_view_parser.add_argument( - # "--blockchain", - # "-b", - # type=str, - # help="Type of blovkchain wich writng in database", - # required=True, - # ) - # generate_view_parser.add_argument( - # "--infura", - # action="store_true", - # help="Use infura as web3 provider", - # ) - # generate_view_parser.add_argument( - # "--block-number", "-N", type=str, help="Block number." - # ) - # generate_view_parser.add_argument( - # "--jobs-file", - # "-j", - # type=str, - # help="Path to json file with jobs", - # required=False, - # ) - # generate_view_parser.add_argument( - # "--batch-size", - # "-s", - # type=int, - # default=500, - # help="Size of chunks wich send to Multicall2 contract.", - # ) - # generate_view_parser.set_defaults(func=handle_crawl_v3) - args = parser.parse_args() args.func(args) diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index d5b0894c..40646fd2 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.4.13" +MOONCRAWL_VERSION = "0.5.0" From e5482f7cded522422922962660c07140683356d7 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 26 Nov 2024 17:11:29 +0200 Subject: [PATCH 04/10] Remove comments. --- .../mooncrawl/mooncrawl/state_crawler/db.py | 52 ------------------- 1 file changed, 52 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/db.py b/crawlers/mooncrawl/mooncrawl/state_crawler/db.py index b2349bc9..9c24d8a2 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/db.py @@ -44,58 +44,6 @@ def view_call_to_label( del sanityzed_label_data["type"] del sanityzed_label_data["name"] - # class EvmBasedLabel(Base): # type: ignore - # __abstract__ = True - - # id = Column( - # UUID(as_uuid=True), - # primary_key=True, - # default=uuid.uuid4, - # unique=True, - # nullable=False, - # ) - # label = Column(VARCHAR(256), nullable=False, index=True) - - # transaction_hash = Column( - # VARCHAR(128), - # nullable=False, - # index=True, - # ) - # log_index = Column(Integer, nullable=True) - - # block_number = Column( - # BigInteger, - # nullable=False, - # index=True, - # ) - # block_hash = Column(VARCHAR(256), nullable=False) - # block_timestamp = Column(BigInteger, nullable=False) - - # caller_address = Column( - # LargeBinary, - # nullable=True, - # index=True, - # ) - # origin_address = Column( - # LargeBinary, - # nullable=True, - # index=True, - # ) - - # address = Column( - # LargeBinary, - # nullable=False, - # index=True, - # ) - - # label_name = Column(Text, nullable=True, index=True) - # label_type = Column(VARCHAR(64), nullable=True, index=True) - # label_data = Column(JSONB, nullable=True) - - # created_at = Column( - # DateTime(timezone=True), server_default=utcnow(), nullable=False - # ) - ## add zero transaction hash label = label_model( From 0a3dd64708299b4895771c05ba190e0f91e14813 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 26 Nov 2024 17:13:06 +0200 Subject: [PATCH 05/10] Remove api changes. --- moonstreamapi/moonstreamapi/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moonstreamapi/moonstreamapi/version.py b/moonstreamapi/moonstreamapi/version.py index 932ff953..304be8c8 100644 --- a/moonstreamapi/moonstreamapi/version.py +++ b/moonstreamapi/moonstreamapi/version.py @@ -2,4 +2,4 @@ Moonstream library and API version. """ -MOONSTREAMAPI_VERSION = "0.4.11" +MOONSTREAMAPI_VERSION = "0.4.10" From a230798b0808cdea943d15364a5bb583b505c4d1 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 27 Nov 2024 17:42:21 +0200 Subject: [PATCH 06/10] Bump version. --- crawlers/mooncrawl/mooncrawl/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index 40646fd2..3592a458 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.5.0" +MOONCRAWL_VERSION = "0.5.1" From 28e61636f47d8914cbe4587be637cb3dc2ab787e Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 17 Dec 2024 12:46:32 +0200 Subject: [PATCH 07/10] Update address field logic. Add env MOONSTREAM_DB_V3_CONTROLLER_API. --- crawlers/mooncrawl/mooncrawl/settings.py | 28 ++++++++---- .../mooncrawl/mooncrawl/state_crawler/cli.py | 45 ++++++++++++++++++- 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 20cf9253..f477c4af 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -5,19 +5,14 @@ from bugout.app import Bugout from moonstreamtypes.blockchain import AvailableBlockchainType -# Bugout + +# APIs +## Bugout BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev") BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev") bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL) - -MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to") -MOONSTREAM_ENGINE_URL = os.environ.get( - "MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to" -) - - BUGOUT_REQUEST_TIMEOUT_SECONDS_RAW = os.environ.get( "MOONSTREAM_BUGOUT_TIMEOUT_SECONDS", 30 ) @@ -31,6 +26,22 @@ HUMBUG_REPORTER_CRAWLERS_TOKEN = os.environ.get("HUMBUG_REPORTER_CRAWLERS_TOKEN") + +## Moonstream +MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to") + +## Moonstream Engine +MOONSTREAM_ENGINE_URL = os.environ.get( + "MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to" +) + +## Moonstream DB +MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get( + "MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to" +) + + + # Origin RAW_ORIGINS = os.environ.get("MOONSTREAM_CORS_ALLOWED_ORIGINS") if RAW_ORIGINS is None: @@ -490,3 +501,4 @@ MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get( "MOONSTREAM_DB_V3_SCHEMA_NAME", "blockchain" ) + diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index f244b577..e5b81418 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -27,6 +27,7 @@ multicall_contracts, MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, + MOONSTREAM_DB_V3_CONTROLLER_API ) from .db import clean_labels, commit_session, view_call_to_label from .Multicall2_interface import Contract as Multicall2 @@ -44,7 +45,7 @@ def request_connection_string(customer_id: str, instance_id: int, token: str) -> Request connection string from the Moonstream API. """ response = requests.get( - f"https://mdb-v3-api.moonstream.to/customers/{customer_id}/instances/{instance_id}/creds/seer/url", + f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/seer/url", headers={"Authorization": f"Bearer {token}"}, ) @@ -510,6 +511,29 @@ def build_interfaces( return interfaces +def process_address_field(job: Dict[str, Any], moonstream_token: str) -> List[str]: + """Processes the address field of a job and returns a list of addresses.""" + if isinstance(job["address"], str): + return [Web3.toChecksumAddress(job["address"])] + elif isinstance(job["address"], list): + return [Web3.toChecksumAddress(address) for address in job["address"]] # manual job multiplication + elif isinstance(job["address"], dict): + if job["address"].get("type") == "queryAPI": + # QueryAPI job multiplication + addresses = execute_query(job["address"], token=moonstream_token) + for address in addresses: + try: + Web3.toChecksumAddress(address) + except Exception as e: + logger.error(f"Invalid address: {address}") + continue + return addresses + else: + raise ValueError(f"Invalid address type: {type(job['address'])}") + else: + raise ValueError(f"Invalid address type: {type(job['address'])}") + + def parse_jobs( jobs: List[Any], blockchain_type: Any, @@ -542,11 +566,27 @@ def parse_jobs( # All sessions are stored in the dictionary db_sessions # Under one try block try: - # Process jobs and create sessions + # Process jobs and create session + for job in jobs: + + ### process address field + ### Handle case when 1 job represents multiple contracts + addresses = process_address_field(job, moonstream_token) + + for address in addresses[1:]: + new_job = job.copy() + new_job["address"] = address + jobs.append(new_job) + + job["address"] = addresses[0] + + v3 = job.get("v3", False) customer_id = job.get("customer_id") instance_id = job.get("instance_id") + + ### DB sessions if customer_db_uri is not None: if v3 and (customer_id, instance_id) not in db_sessions: # Create session @@ -581,6 +621,7 @@ def parse_jobs( if "v2" not in db_sessions: db_sessions["v2"] = PrePing_SessionLocal() + if job["address"] not in contracts_ABIs: contracts_ABIs[job["address"]] = {} From 312931b4b470b5632d11e5879c12bcb168458751 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 17 Dec 2024 13:33:51 +0200 Subject: [PATCH 08/10] Update model import. --- crawlers/mooncrawl/mooncrawl/actions.py | 1 + crawlers/mooncrawl/mooncrawl/api.py | 13 ++++++++++--- crawlers/mooncrawl/mooncrawl/settings.py | 3 +-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index ba6cf868..3ceb70f3 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -145,6 +145,7 @@ def recive_S3_data_from_query( json=json, timeout=5, ) + print(response.json()) data_url = MoonstreamQueryResultUrl(url=response.json()["url"]) else: data_url = client.exec_query( diff --git a/crawlers/mooncrawl/mooncrawl/api.py b/crawlers/mooncrawl/mooncrawl/api.py index 615d2805..9feb83b2 100644 --- a/crawlers/mooncrawl/mooncrawl/api.py +++ b/crawlers/mooncrawl/mooncrawl/api.py @@ -13,7 +13,7 @@ from bugout.data import BugoutJournalEntity, BugoutResource from fastapi import BackgroundTasks, FastAPI from fastapi.middleware.cors import CORSMiddleware -from moonstreamdb.blockchain import ( +from moonstreamtypes.blockchain import ( AvailableBlockchainType, get_block_model, get_label_model, @@ -231,6 +231,7 @@ async def queries_data_update_handler( raise MoonstreamHTTPException(status_code=500) requested_query = request_data.query + labels_version = 2 blockchain_table = "polygon_labels" if request_data.blockchain: @@ -240,6 +241,12 @@ async def queries_data_update_handler( blockchain = AvailableBlockchainType(request_data.blockchain) + if ( + request_data.customer_id is not None + and request_data.instance_id is not None + ): + labels_version = 3 + requested_query = ( requested_query.replace( "__transactions_table__", @@ -251,11 +258,11 @@ async def queries_data_update_handler( ) .replace( "__labels_table__", - get_label_model(blockchain).__tablename__, + get_label_model(blockchain, labels_version).__tablename__, ) ) - blockchain_table = get_label_model(blockchain).__tablename__ + blockchain_table = get_label_model(blockchain, labels_version).__tablename__ # Check if it can transform to TextClause try: diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index f477c4af..18e3810d 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -41,7 +41,6 @@ ) - # Origin RAW_ORIGINS = os.environ.get("MOONSTREAM_CORS_ALLOWED_ORIGINS") if RAW_ORIGINS is None: @@ -392,6 +391,7 @@ AvailableBlockchainType.BLAST: "0xcA11bde05977b3631167028862bE2a173976CA11", AvailableBlockchainType.MANTLE: "0xcA11bde05977b3631167028862bE2a173976CA11", AvailableBlockchainType.MANTLE_SEPOLIA: "0xcA11bde05977b3631167028862bE2a173976CA11", + AvailableBlockchainType.GAME7_TESTNET: "0xcA11bde05977b3631167028862bE2a173976CA11", } @@ -501,4 +501,3 @@ MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get( "MOONSTREAM_DB_V3_SCHEMA_NAME", "blockchain" ) - From a2cf503327829654362455009b1d82830b8ac1eb Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 17 Dec 2024 15:57:23 +0200 Subject: [PATCH 09/10] Update moonstream types dependency. Add state-crawler executions parameters over v3. --- crawlers/mooncrawl/mooncrawl/actions.py | 89 ++++++++++++++++++- crawlers/mooncrawl/mooncrawl/api.py | 64 ++++--------- .../mooncrawl/mooncrawl/state_crawler/cli.py | 36 +++++--- crawlers/mooncrawl/setup.py | 2 +- moonstreamapi/moonstreamapi/routes/queries.py | 2 +- moonstreamapi/requirements.txt | 1 + moonstreamapi/setup.py | 1 + 7 files changed, 132 insertions(+), 63 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index 3ceb70f3..f62b66b4 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -16,7 +16,15 @@ Moonstream, MoonstreamQueryResultUrl, ) +from sqlalchemy import text, TextClause +from moonstreamtypes.blockchain import ( + AvailableBlockchainType, + get_block_model, + get_label_model, + get_transaction_model, +) +from .data import QueryDataUpdate from .middleware import MoonstreamHTTPException from .settings import ( bugout_client as bc, @@ -34,6 +42,12 @@ class EntityCollectionNotFoundException(Exception): """ +class QueryTextClauseException(Exception): + """ + Raised when query can't be transformed to TextClause + """ + + def push_data_to_bucket( data: Any, key: str, bucket: str, metadata: Dict[str, Any] = {} ) -> None: @@ -120,6 +134,7 @@ def recive_S3_data_from_query( time_await: int = 2, max_retries: int = 30, custom_body: Optional[Dict[str, Any]] = None, + customer_params: Optional[Dict[str, Any]] = {}, ) -> Any: """ Await the query to be update data on S3 with if_modified_since and return new the data. @@ -133,7 +148,7 @@ def recive_S3_data_from_query( if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT") time.sleep(2) - if custom_body: + if custom_body or customer_params: headers = { "Authorization": f"Bearer {token}", } @@ -142,6 +157,7 @@ def recive_S3_data_from_query( response = requests.post( url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data", headers=headers, + params=customer_params, json=json, timeout=5, ) @@ -227,3 +243,74 @@ def get_customer_db_uri( except Exception as e: logger.error(f"Error get customer db uri: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) + + +def resolve_table_names(request_data: QueryDataUpdate) -> Dict[str, str]: + """ + Determines the table names based on the blockchain and labels version. + Returns an empty dictionary if blockchain is not provided. + """ + if not request_data.blockchain: + return {"labels_table": "ethereum_labels"} + + if request_data.blockchain not in [i.value for i in AvailableBlockchainType]: + logger.error(f"Unknown blockchain {request_data.blockchain}") + raise MoonstreamHTTPException(status_code=403, detail="Unknown blockchain") + + blockchain = AvailableBlockchainType(request_data.blockchain) + labels_version = 2 + + if request_data.customer_id is not None and request_data.instance_id is not None: + labels_version = 3 + + print(labels_version, blockchain) + + tables = { + "labels_table": get_label_model(blockchain, labels_version).__tablename__, + } + + if labels_version != 3: + tables.update( + { + "transactions_table": get_transaction_model(blockchain).__tablename__, + "blocks_table": get_block_model(blockchain).__tablename__, + } + ) + + return tables + + +def prepare_query( + requested_query: str, tables: Dict[str, str], query_id: str +) -> TextClause: + """ + Prepares the SQL query by replacing placeholders with actual table names. + """ + # Check and replace placeholders only if they exist in the query + if "__labels_table__" in requested_query: + requested_query = requested_query.replace( + "__labels_table__", tables.get("labels_table", "ethereum_labels") + ) + + if "__transactions_table__" in requested_query and "transactions_table" in tables: + requested_query = requested_query.replace( + "__transactions_table__", tables["transactions_table"] + ) + + if "__blocks_table__" in requested_query and "blocks_table" in tables: + requested_query = requested_query.replace( + "__blocks_table__", tables["blocks_table"] + ) + + # Check if it can transform to TextClause + try: + query = text(requested_query) + except Exception as e: + logger.error( + f"Can't parse query {query_id} to TextClause in drones /query_update endpoint, error: {e}" + ) + raise QueryTextClauseException( + f"Can't parse query {query_id} to TextClause in drones /query_update endpoint, error: {e}" + ) + + return query diff --git a/crawlers/mooncrawl/mooncrawl/api.py b/crawlers/mooncrawl/mooncrawl/api.py index 9feb83b2..25bac945 100644 --- a/crawlers/mooncrawl/mooncrawl/api.py +++ b/crawlers/mooncrawl/mooncrawl/api.py @@ -13,13 +13,6 @@ from bugout.data import BugoutJournalEntity, BugoutResource from fastapi import BackgroundTasks, FastAPI from fastapi.middleware.cors import CORSMiddleware -from moonstreamtypes.blockchain import ( - AvailableBlockchainType, - get_block_model, - get_label_model, - get_transaction_model, -) -from sqlalchemy import text from . import data from .actions import ( @@ -27,6 +20,9 @@ generate_s3_access_links, get_entity_subscription_collection_id, query_parameter_hash, + prepare_query, + resolve_table_names, + QueryTextClauseException, ) from .middleware import MoonstreamHTTPException from .settings import ( @@ -230,50 +226,20 @@ async def queries_data_update_handler( logger.error(f"Unhandled query execute exception, error: {e}") raise MoonstreamHTTPException(status_code=500) - requested_query = request_data.query - labels_version = 2 - - blockchain_table = "polygon_labels" - if request_data.blockchain: - if request_data.blockchain not in [i.value for i in AvailableBlockchainType]: - logger.error(f"Unknown blockchain {request_data.blockchain}") - raise MoonstreamHTTPException(status_code=403, detail="Unknown blockchain") - - blockchain = AvailableBlockchainType(request_data.blockchain) - - if ( - request_data.customer_id is not None - and request_data.instance_id is not None - ): - labels_version = 3 - - requested_query = ( - requested_query.replace( - "__transactions_table__", - get_transaction_model(blockchain).__tablename__, - ) - .replace( - "__blocks_table__", - get_block_model(blockchain).__tablename__, - ) - .replace( - "__labels_table__", - get_label_model(blockchain, labels_version).__tablename__, - ) - ) - - blockchain_table = get_label_model(blockchain, labels_version).__tablename__ + # Resolve table names based on the request data default ethereum + tables = resolve_table_names(request_data) - # Check if it can transform to TextClause + # Prepare the query with the resolved table names try: - query = text(requested_query) + query = prepare_query(request_data.query, tables, query_id) + except QueryTextClauseException as e: + logger.error(f"Error preparing query for query id: {query_id}, error: {e}") + raise MoonstreamHTTPException(status_code=500, detail="Error preparing query") except Exception as e: - logger.error( - f"Can't parse query {query_id} to TextClause in drones /query_update endpoint, error: {e}" - ) - raise MoonstreamHTTPException(status_code=500, detail="Can't parse query") + logger.error(f"Error preparing query for query id: {query_id}, error: {e}") + raise MoonstreamHTTPException(status_code=500, detail="Error preparing query") - # Get requried keys for query + # Get required keys for query expected_query_parameters = query._bindparams.keys() # request.params validations @@ -308,9 +274,9 @@ async def queries_data_update_handler( params_hash=params_hash, customer_id=request_data.customer_id, instance_id=request_data.instance_id, - blockchain_table=blockchain_table, + blockchain_table=tables["labels_table"], + # Add any additional parameters needed for the task ) - except Exception as e: logger.error(f"Unhandled query execute exception, error: {e}") raise MoonstreamHTTPException(status_code=500) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index e5b81418..fffeb4f5 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -27,7 +27,7 @@ multicall_contracts, MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_STATE_CRAWLER_JOURNAL_ID, - MOONSTREAM_DB_V3_CONTROLLER_API + MOONSTREAM_DB_V3_CONTROLLER_API, ) from .db import clean_labels, commit_session, view_call_to_label from .Multicall2_interface import Contract as Multicall2 @@ -40,21 +40,26 @@ client = Moonstream() -def request_connection_string(customer_id: str, instance_id: int, token: str) -> str: +def request_connection_string( + customer_id: str, + instance_id: int, + token: str, + user: str = "seer", # token with write access +) -> str: """ Request connection string from the Moonstream API. """ response = requests.get( - f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/seer/url", + f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url", headers={"Authorization": f"Bearer {token}"}, ) response.raise_for_status() - return response.text + return response.text.replace('"', "") -def execute_query(query: Dict[str, Any], token: str): +def execute_query(query: Dict[str, Any], token: str) -> Any: """ Query task example: @@ -72,6 +77,8 @@ def execute_query(query: Dict[str, Any], token: str): """ + print(f"Executing query: {query}") + # get the query url query_url = query["query_url"] @@ -82,6 +89,11 @@ def execute_query(query: Dict[str, Any], token: str): params = query["params"] body = {"params": params} + query_params = dict() + + if query.get("customer_id") and query.get("instance_id"): + query_params["customer_id"] = query["customer_id"] + query_params["instance_id"] = query["instance_id"] if blockchain: body["blockchain"] = blockchain @@ -93,6 +105,7 @@ def execute_query(query: Dict[str, Any], token: str): token=token, query_name=query_url, custom_body=body, + customer_params=query_params, ) # extract the keys as a list @@ -516,18 +529,21 @@ def process_address_field(job: Dict[str, Any], moonstream_token: str) -> List[st if isinstance(job["address"], str): return [Web3.toChecksumAddress(job["address"])] elif isinstance(job["address"], list): - return [Web3.toChecksumAddress(address) for address in job["address"]] # manual job multiplication + return [ + Web3.toChecksumAddress(address) for address in job["address"] + ] # manual job multiplication elif isinstance(job["address"], dict): if job["address"].get("type") == "queryAPI": # QueryAPI job multiplication addresses = execute_query(job["address"], token=moonstream_token) + checsum_addresses = [] for address in addresses: try: - Web3.toChecksumAddress(address) + checsum_addresses.append(Web3.toChecksumAddress(address)) except Exception as e: - logger.error(f"Invalid address: {address}") + logger.error(f"Invalid address: {address}") continue - return addresses + return checsum_addresses else: raise ValueError(f"Invalid address type: {type(job['address'])}") else: @@ -581,7 +597,6 @@ def parse_jobs( job["address"] = addresses[0] - v3 = job.get("v3", False) customer_id = job.get("customer_id") instance_id = job.get("instance_id") @@ -621,7 +636,6 @@ def parse_jobs( if "v2" not in db_sessions: db_sessions["v2"] = PrePing_SessionLocal() - if job["address"] not in contracts_ABIs: contracts_ABIs[job["address"]] = {} diff --git a/crawlers/mooncrawl/setup.py b/crawlers/mooncrawl/setup.py index 5e715faa..f009edfc 100644 --- a/crawlers/mooncrawl/setup.py +++ b/crawlers/mooncrawl/setup.py @@ -39,7 +39,7 @@ "fastapi", "moonstreamdb>=0.4.5", "moonstreamdb-v3>=0.0.16", - "moonstream-types>=0.0.6", + "moonstream-types>=0.0.7", "moonstream>=0.1.1", "moonworm[moonstream]>=0.9.3", "humbug", diff --git a/moonstreamapi/moonstreamapi/routes/queries.py b/moonstreamapi/moonstreamapi/routes/queries.py index c1388beb..e9f05216 100644 --- a/moonstreamapi/moonstreamapi/routes/queries.py +++ b/moonstreamapi/moonstreamapi/routes/queries.py @@ -16,7 +16,7 @@ ) from bugout.exceptions import BugoutResponseException from fastapi import APIRouter, Body, Path, Query, Request -from moonstreamdb.blockchain import AvailableBlockchainType +from moonstreamtypes.blockchain import AvailableBlockchainType from sqlalchemy import text from .. import data diff --git a/moonstreamapi/requirements.txt b/moonstreamapi/requirements.txt index f5cde3b8..c79bb7ca 100644 --- a/moonstreamapi/requirements.txt +++ b/moonstreamapi/requirements.txt @@ -39,6 +39,7 @@ MarkupSafe==2.1.1 moonstream==0.1.1 moonstreamdb==0.4.5 moonstreamdb-v3==0.0.18 +moonstreamtypes==0.0.6 multiaddr==0.0.9 multidict==6.0.2 netaddr==0.8.0 diff --git a/moonstreamapi/setup.py b/moonstreamapi/setup.py index ce4f3861..bc9f7d24 100644 --- a/moonstreamapi/setup.py +++ b/moonstreamapi/setup.py @@ -18,6 +18,7 @@ "moonstream", "moonstreamdb>=0.4.5", "moonstreamdb-v3>=0.0.18", + "moonstream-types>=0.0.7", "humbug", "pydantic==1.10.2", "pyevmasm", From b6d243eef3fe16451350b25fb24d5553f0c0a8eb Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 20 Dec 2024 17:23:35 +0200 Subject: [PATCH 10/10] Add required fixes for v3 databases. --- crawlers/mooncrawl/mooncrawl/state_crawler/cli.py | 1 + crawlers/mooncrawl/setup.py | 2 +- moonstreamapi/requirements.txt | 2 +- moonstreamapi/setup.py | 4 ++-- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index fffeb4f5..4b2a5a8b 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -195,6 +195,7 @@ def process_multicall_result( "call_data": multicall_calls[index][1], "block_number": block_number, "block_timestamp": block_timestamp, + "block_hash": block_hash, "status": encoded_data[0], "error": str(e), "v3": call.get("v3", False), diff --git a/crawlers/mooncrawl/setup.py b/crawlers/mooncrawl/setup.py index f009edfc..0102bd69 100644 --- a/crawlers/mooncrawl/setup.py +++ b/crawlers/mooncrawl/setup.py @@ -39,7 +39,7 @@ "fastapi", "moonstreamdb>=0.4.5", "moonstreamdb-v3>=0.0.16", - "moonstream-types>=0.0.7", + "moonstream-types>=0.0.9", "moonstream>=0.1.1", "moonworm[moonstream]>=0.9.3", "humbug", diff --git a/moonstreamapi/requirements.txt b/moonstreamapi/requirements.txt index c79bb7ca..9399bee5 100644 --- a/moonstreamapi/requirements.txt +++ b/moonstreamapi/requirements.txt @@ -39,7 +39,7 @@ MarkupSafe==2.1.1 moonstream==0.1.1 moonstreamdb==0.4.5 moonstreamdb-v3==0.0.18 -moonstreamtypes==0.0.6 +moonstreamtypes==0.0.9 multiaddr==0.0.9 multidict==6.0.2 netaddr==0.8.0 diff --git a/moonstreamapi/setup.py b/moonstreamapi/setup.py index bc9f7d24..47a25bff 100644 --- a/moonstreamapi/setup.py +++ b/moonstreamapi/setup.py @@ -17,8 +17,8 @@ "fastapi", "moonstream", "moonstreamdb>=0.4.5", - "moonstreamdb-v3>=0.0.18", - "moonstream-types>=0.0.7", + "moonstreamdb-v3>=0.1.2", + "moonstream-types>=0.0.9", "humbug", "pydantic==1.10.2", "pyevmasm",