Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small fixes and cleanup #79

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
13 changes: 0 additions & 13 deletions src/helpers/blockchain_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,6 @@ def get_transaction_timestamp(self, tx_hash: str) -> tuple[str, int]:

return tx_hash, timestamp

def get_transaction_tokens(self, tx_hash: str) -> list[tuple[str, str]]:
receipt = self.web3.eth.get_transaction_receipt(HexStr(tx_hash))

transfer_topic = self.web3.keccak(text="Transfer(address,address,uint256)")

token_addresses: set[str] = set()
for log in receipt["logs"]:
if log["topics"] and log["topics"][0] == transfer_topic:
token_address = log["address"]
token_addresses.add(token_address)

return [(tx_hash, token_address) for token_address in token_addresses]

def get_token_decimals(self, token_address: str) -> int:
"""Get number of decimals for a token."""
contract = self.web3.eth.contract(
Expand Down
18 changes: 18 additions & 0 deletions src/helpers/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ def execute_and_commit(self, query: str, params: dict):
connection.rollback()
raise

def get_price(self, token_address, time, source):
query = "SELECT * FROM prices WHERE token_address = :token_address AND time = :time AND source = :source"
result = self.execute_query(
query,
{
"token_address": bytes.fromhex(token_address[2:]),
"time": datetime.fromtimestamp(time, tz=timezone.utc),
"source": source,
},
)
return result.fetchone()

def write_token_imbalances(
self,
tx_hash: str,
Expand Down Expand Up @@ -173,6 +185,12 @@ def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None:
)
for token_address, time, price, source in prices:
try:
if self.get_price(token_address, time, source) is not None:
logger.info(
"Skipping INSERT operation as entry already exists in PRICES table."
)
continue

self.execute_and_commit(
query,
{
Expand Down
19 changes: 7 additions & 12 deletions src/transaction_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,19 @@ def process_single_transaction(
"""Function processes a single tx to find imbalances, fees, prices including writing to database."""
self.log_message = []
try:
# compute raw token imbalances
token_imbalances = self.process_token_imbalances(
tx_hash, auction_id, block_number
)

# get transaction timestamp
transaction_timestamp = self.blockchain_data.get_transaction_timestamp(
tx_hash
)
# store transaction timestamp
self.db.write_transaction_timestamp(transaction_timestamp)

# get transaction tokens
# transaction_tokens = self.blockchain_data.get_transaction_tokens(tx_hash)
# store transaction tokens
# get transaction tokens by first computing raw token imbalances
token_imbalances = self.process_token_imbalances(tx_hash)
transaction_tokens = []
for token_address, imbalance in token_imbalances.items():
if imbalance != 0:
transaction_tokens.append((tx_hash, token_address))
for token_address in token_imbalances.keys():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would reorganize code here.

Instead of fetching token_imbalances in the beginning, and then having this loop here, both should be moved into a self.imbalances.get_transaction_tokens function. That function would then be called here. The function self.blockchain_data.get_transaction_tokens should then be deleted.

Copy link
Contributor Author

@harisang harisang Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is still not clear where and how the raw imbalances will be used, i decided to keep it separate and not part of a get_transaction_tokens function.

Rearranged a few things and modified the test as well (not sure if you like how it looks like), in order to check the current version of the code. Ended up removing the test as it wasn't easy to make it work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason for separating functionality into functions is that it makes it possible to test things.

Moving a function into the main body and removing tests is not ideal.

I can handle moving the function into token imbalances if you do not have the capacity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok i am attempting to do this now

transaction_tokens.append((tx_hash, token_address))
# store transaction tokens
self.db.write_transaction_tokens(transaction_tokens)

# update token decimals
Expand Down Expand Up @@ -202,7 +196,8 @@ def process_single_transaction(
return

def process_token_imbalances(
self, tx_hash: str, auction_id: int, block_number: int
self,
tx_hash: str,
) -> dict[str, int]:
"""Process token imbalances for a given transaction and return imbalances."""
try:
Expand Down
19 changes: 0 additions & 19 deletions tests/e2e/test_blockchain_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,3 @@ def test_get_transaction_timestamp():
transaction_timestamp = blockchain.get_transaction_timestamp(tx_hash)

assert transaction_timestamp == (tx_hash, 1728044411)


def test_get_transaction_tokens():
web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL")))
blockchain = BlockchainData(web3)
tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c"

transaction_tokens = blockchain.get_transaction_tokens(tx_hash)

assert all(h == tx_hash for h, _ in transaction_tokens)
assert set(token_address for _, token_address in transaction_tokens) == {
"0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9",
"0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee",
"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2",
"0xdAC17F958D2ee523a2206206994597C13D831ec7",
"0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637",
"0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9",
}
42 changes: 41 additions & 1 deletion tests/unit/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,47 @@ def tests_write_prices():
).all()
for i, (token_address, time, price, source) in enumerate(token_prices):
assert HexBytes(res[i][0]) == HexBytes(token_address)
assert res[i][1].timestamp() == time
assert res[i][1].replace(tzinfo=timezone.utc).timestamp() == time
assert float(res[i][2]) == price
assert res[i][3] == source


def tests_write_duplicate_prices():
engine = create_engine(
f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet"
)
db = Database(engine, "mainnet")
# list contains duplicate entry in order to test how this is handled
token_prices = [
(
"0xA0B86991C6218B36C1D19D4A2E9EB0CE3606EB48",
int(datetime.fromisoformat("2024-10-10 16:48:47.000000").timestamp()),
0.000420454193230350,
"coingecko",
),
(
"0xA0B86991C6218B36C1D19D4A2E9EB0CE3606EB48",
int(datetime.fromisoformat("2024-10-10 16:48:47.000000").timestamp()),
0.000420454193230350,
"coingecko",
),
]
# truncate table
with engine.connect() as conn:
conn.execute(text("TRUNCATE prices"))
conn.commit()
# write data
db.write_prices_new(token_prices)
# read data
with engine.connect() as conn:
res = conn.execute(
text("SELECT token_address, time, price, source FROM prices")
).all()
# cleaning up the duplicate entry
token_prices = token_prices[:1]
for i, (token_address, time, price, source) in enumerate(token_prices):
assert HexBytes(res[i][0]) == HexBytes(token_address)
fhenneke marked this conversation as resolved.
Show resolved Hide resolved
assert res[i][1].replace(tzinfo=timezone.utc).timestamp() == time
assert float(res[i][2]) == price
assert res[i][3] == source

Expand Down
Loading