-
Notifications
You must be signed in to change notification settings - Fork 0
/
preloader.py
146 lines (127 loc) · 5.84 KB
/
preloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import json
from pydantic import ValidationError
from redis import asyncio as aioredis
from snapshotter.settings.config import settings
from snapshotter.utils.default_logger import default_logger
from snapshotter.utils.generic_delegator_preloader import DelegatorPreloaderAsyncWorker
from snapshotter.utils.helper_functions import preloading_entry_exit_logger
from snapshotter.utils.models.message_models import EpochBase
from snapshotter.utils.models.message_models import PowerloomDelegateTxReceiptWorkerResponseMessage
from snapshotter.utils.models.message_models import PowerloomDelegateWorkerRequestMessage
from snapshotter.utils.redis.redis_keys import epoch_txs_htable
from snapshotter.utils.rpc import RpcHelper
from snapshotter.utils.snapshot_utils import get_block_details_in_block_range
class TxPreloadWorker(DelegatorPreloaderAsyncWorker):
"""
A worker class for preloading transaction receipts.
Inherits from DelegatorPreloaderAsyncWorker.
"""
def __init__(self) -> None:
"""
Initialize the TxPreloadWorker.
Sets the task type to 'txreceipt'.
"""
super(TxPreloadWorker, self).__init__()
self._task_type = 'txreceipt'
async def _handle_filter_worker_response_message(self, message: bytes):
"""
Handle the response message from the filter worker.
Args:
message (bytes): The raw message received from the worker.
This method parses the message, validates it, and processes the transaction receipt.
"""
try:
msg_obj: PowerloomDelegateTxReceiptWorkerResponseMessage = (
PowerloomDelegateTxReceiptWorkerResponseMessage.parse_raw(message)
)
except ValidationError:
self._logger.opt(exception=settings.logs.debug_mode).error(
'Bad message structure of txreceiptResponse',
)
return
except Exception:
self._logger.opt(exception=settings.logs.debug_mode).error(
'Unexpected message format of txreceiptResponse',
)
return
if msg_obj.txReceipt is None:
self._logger.warning(
'Received txreceiptResponse with empty txReceipt for requestId'
f' {msg_obj.requestId} for epoch {msg_obj.epochId}',
)
return
async with self._rw_lock.reader_lock:
if msg_obj.requestId not in self._awaited_delegated_response_ids:
# Uncomment for debugging purposes
# self._logger.warning(
# f'Received txreceiptResponse for unknown requestId {msg_obj.requestId} for epoch {msg_obj.epochId}',
# )
# self._logger.warning(
# 'Known requestIds for epoch '
# f'{msg_obj.epochId}: {self._awaited_delegated_response_ids}',
# )
return
async with self._rw_lock.writer_lock:
self._awaited_delegated_response_ids.remove(msg_obj.requestId)
self._collected_response_objects.update(
{msg_obj.txHash: msg_obj.txReceipt},
)
async def _on_delegated_responses_complete(self):
"""
Handle the completion of all delegated responses.
This method is called when all expected responses have been received.
It stores the collected transaction receipts in Redis.
"""
if self._collected_response_objects:
await self._redis_conn.hset(
name=epoch_txs_htable(epoch_id=self._epoch.epochId),
mapping={
k: json.dumps(v)
for k, v in self._collected_response_objects.items()
},
)
@preloading_entry_exit_logger
async def compute(self, epoch: EpochBase, redis_conn: aioredis.Redis, rpc_helper: RpcHelper):
"""
Compute method to preload transaction receipts for a given epoch.
Args:
epoch (EpochBase): The epoch for which to preload transaction receipts.
redis_conn (aioredis.Redis): Redis connection object.
rpc_helper (RpcHelper): RPC helper object for blockchain interactions.
This method performs the following steps:
1. Set up logging and store epoch and Redis connection.
2. Clean up old data from Redis.
3. Fetch block details for the epoch's block range.
4. Extract transactions from the blocks.
5. Create worker request messages for each transaction.
6. Delegate the computation to the parent class method.
"""
self._logger = default_logger.bind(module='TxPreloadWorker')
self._epoch = epoch
self._redis_conn = redis_conn
# Clean up hset for current epoch - 30 if it exists
await self._redis_conn.delete(epoch_txs_htable(epoch_id=self._epoch.epochId - 30))
tx_list = list()
block_details = await get_block_details_in_block_range(
from_block=epoch.begin,
to_block=epoch.end,
redis_conn=redis_conn,
rpc_helper=rpc_helper,
)
[tx_list.extend(block['transactions']) for block in block_details.values()]
# Create worker request messages for each transaction
tx_receipt_query_messages = [
PowerloomDelegateWorkerRequestMessage(
epochId=epoch.epochId,
extra={'tx_hash': tx_hash},
requestId=idx + 1,
task_type=self._task_type,
)
for idx, tx_hash in enumerate(tx_list)
]
self._request_id_query_obj_map = {
msg_obj.requestId: msg_obj
for msg_obj in tx_receipt_query_messages
}
# Delegate the computation to the parent class method
return await super(TxPreloadWorker, self).compute_with_retry(epoch, redis_conn, rpc_helper)