Skip to content

Commit

Permalink
feat: events
Browse files Browse the repository at this point in the history
  • Loading branch information
wavey0x committed Dec 28, 2022
1 parent ecf8762 commit aa79c97
Showing 1 changed file with 52 additions and 16 deletions.
68 changes: 52 additions & 16 deletions yearn/events.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
import logging
from collections import Counter, defaultdict
from itertools import zip_longest
from typing import Any, Dict, List, Optional, Union

from brownie import web3, chain
from brownie.network.event import EventDict, _decode_logs, _add_deployment_topics
from brownie import chain, web3
from brownie.exceptions import ContractNotFound
from brownie.network.event import (EventDict, _add_deployment_topics,
_decode_logs)
from joblib import Parallel, delayed
from toolz import groupby
from web3.middleware.filter import block_ranges
from web3.types import LogReceipt, RPCEndpoint

from yearn.middleware.middleware import BATCH_SIZE
from yearn.utils import contract_creation_block, contract
from yearn.typing import Address, Block, Topics
from yearn.utils import contract, contract_creation_block

logger = logging.getLogger(__name__)


def decode_logs(logs) -> EventDict:
def decode_logs(logs: List[LogReceipt]) -> EventDict:
"""
Decode logs to events and enrich them with additional info.
"""
addresses = {log.address for log in logs}
for address in addresses:
__add_deployment_topics(address)

decoded = _decode_logs(logs)
for i, log in enumerate(logs):
setattr(decoded[i], "block_number", log["blockNumber"])
Expand All @@ -26,40 +35,53 @@ def decode_logs(logs) -> EventDict:
return decoded


def create_filter(address, topics=None):
def create_filter(address: Address, topics: Optional[Topics] = None) -> RPCEndpoint:
"""
Create a log filter for one or more contracts.
Set fromBlock as the earliest creation block.
"""
if isinstance(address, list):
for a in address:
add_deployment_topics(a)
start_block = min(map(contract_creation_block, address))
else:
add_deployment_topics(address)
start_block = contract_creation_block(address)

return web3.eth.filter({"address": address, "fromBlock": start_block, "topics": topics})


def add_deployment_topics(address):
_add_deployment_topics(address, contract(address).abi)
def __add_deployment_topics(address: Address) -> None:
try:
_add_deployment_topics(address, contract(address).abi)
except ContractNotFound:
# This contract seems to have self destructed
pass

def get_logs_asap(
addresses: Optional[Union[Address,List[Address]]] = None,
topics: Optional[Topics] = None,
from_block: Optional[Block] = None,
to_block: Optional[Block] = None,
verbose: int = 0
) -> List[LogReceipt]:

def get_logs_asap(address, topics, from_block=None, to_block=None, verbose=0):
logs = []

if from_block is None:
from_block = contract_creation_block(address)
if type(addresses) == list:
from_block = min(map(contract_creation_block, addresses))
elif addresses:
from_block = contract_creation_block(addresses)
else:
from_block = 0

if to_block is None:
to_block = chain.height

ranges = list(block_ranges(from_block, to_block, BATCH_SIZE[chain.id]))
ranges = list(block_ranges(from_block, to_block, BATCH_SIZE))
if verbose > 0:
logger.info('fetching %d batches', len(ranges))

batches = Parallel(8, "threading", verbose=verbose)(
delayed(web3.eth.get_logs)({"address": address, "topics": topics, "fromBlock": start, "toBlock": end})
delayed(web3.eth.get_logs)(_get_logs_params(addresses, topics, start, end))
for start, end in ranges
)
for batch in batches:
Expand All @@ -68,7 +90,7 @@ def get_logs_asap(address, topics, from_block=None, to_block=None, verbose=0):
return logs


def logs_to_balance_checkpoints(logs):
def logs_to_balance_checkpoints(logs: List[LogReceipt]) -> Dict[Address,Dict[Block,int]]:
"""
Convert Transfer logs to `{address: {from_block: balance}}` checkpoints.
"""
Expand All @@ -86,11 +108,25 @@ def logs_to_balance_checkpoints(logs):
return checkpoints


def checkpoints_to_weight(checkpoints, start_block, end_block):
def checkpoints_to_weight(checkpoints, start_block: Block, end_block: Block):
total = 0
for a, b in zip_longest(list(checkpoints), list(checkpoints)[1:]):
if a < start_block or a > end_block:
continue
b = min(b, end_block) if b else end_block
total += checkpoints[a] * (b - a) / (end_block - start_block)
return total

def _get_logs_params(
addresses: Optional[Union[Address,List[Address]]],
topics: Optional[Topics],
start: Block,
end: Block
) -> Dict[str,Any]:
params = {"fromBlock": start, "toBlock": end}
if addresses:
# pass in a list of addresses
params["address"] = addresses
if topics:
params["topics"] = topics
return params

0 comments on commit aa79c97

Please sign in to comment.