-
Notifications
You must be signed in to change notification settings - Fork 0
/
preloader.py
67 lines (56 loc) · 2.36 KB
/
preloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from redis import asyncio as aioredis
from snapshotter.utils.callback_helpers import GenericPreloader
from snapshotter.utils.default_logger import default_logger
from snapshotter.utils.models.message_models import EpochBase
from snapshotter.utils.rpc import RpcHelper
from snapshotter.utils.snapshot_utils import get_block_details_in_block_range
class BlockDetailsPreloader(GenericPreloader):
"""
A preloader class for fetching and caching block details within a specified epoch range.
This class extends GenericPreloader and implements methods to compute and cleanup
block details for a given epoch range.
"""
def __init__(self) -> None:
"""
Initialize the BlockDetailsPreloader with a logger.
"""
self._logger = default_logger.bind(module='BlockDetailsPreloader')
async def compute(
self,
epoch: EpochBase,
redis_conn: aioredis.Redis,
rpc_helper: RpcHelper,
):
"""
Compute and cache block details for the given epoch range.
Args:
epoch (EpochBase): The epoch object containing begin and end block heights.
redis_conn (aioredis.Redis): Redis connection for caching.
rpc_helper (RpcHelper): Helper object for making RPC calls.
This method fetches block details for all blocks within the epoch range
and caches them in Redis.
"""
min_chain_height = epoch.begin
max_chain_height = epoch.end
try:
# Fetch and cache block details for all blocks in the specified range
await get_block_details_in_block_range(
from_block=min_chain_height,
to_block=max_chain_height,
redis_conn=redis_conn,
rpc_helper=rpc_helper,
)
except Exception as e:
# Log any errors that occur during the process
self._logger.error(f'Error in block details preloader: {e}')
raise e
finally:
# Ensure Redis connection is closed after operation
await redis_conn.close()
async def cleanup(self):
"""
Perform any necessary cleanup operations.
This method is currently a placeholder and does not perform any actions.
It can be implemented in the future if cleanup operations are needed.
"""
pass