From 9b020de7e95e06e065f5ab89d60403ef0bbb2246 Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 02:30:54 +0300 Subject: [PATCH 01/12] small fixes and cleanup --- src/helpers/database.py | 8 +++++++- src/transaction_processor.py | 12 +++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/helpers/database.py b/src/helpers/database.py index e465376..a6974a9 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -173,11 +173,17 @@ def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: ) for token_address, time, price, source in prices: try: + date_time = datetime.fromtimestamp(time, tz=timezone.utc) + check_existence_query = f"SELECT * FROM prices WHERE token_address = '\\{token_address[1:]}' and time = '{date_time}' and source = '{source}'" + result = self.execute_query(check_existence_query, {}).fetchone() + if result is not None: + continue + self.execute_and_commit( query, { "token_address": bytes.fromhex(token_address[2:]), - "time": datetime.fromtimestamp(time, tz=timezone.utc), + "time": date_time, "price": price, "source": source, }, diff --git a/src/transaction_processor.py b/src/transaction_processor.py index af42ace..47b69e7 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -119,9 +119,7 @@ def process_single_transaction( self.log_message = [] try: # compute raw token imbalances - token_imbalances = self.process_token_imbalances( - tx_hash, auction_id, block_number - ) + token_imbalances = self.process_token_imbalances(tx_hash) # get transaction timestamp transaction_timestamp = self.blockchain_data.get_transaction_timestamp( @@ -134,9 +132,8 @@ def process_single_transaction( # transaction_tokens = self.blockchain_data.get_transaction_tokens(tx_hash) # store transaction tokens transaction_tokens = [] - for token_address, imbalance in token_imbalances.items(): - if imbalance != 0: - transaction_tokens.append((tx_hash, token_address)) + for token_address in token_imbalances.keys(): + transaction_tokens.append((tx_hash, token_address)) self.db.write_transaction_tokens(transaction_tokens) # update token decimals @@ -202,7 +199,8 @@ def process_single_transaction( return def process_token_imbalances( - self, tx_hash: str, auction_id: int, block_number: int + self, + tx_hash: str, ) -> dict[str, int]: """Process token imbalances for a given transaction and return imbalances.""" try: From 5ab545e4c963fd08e2ca0cb6314b0ec129bc581e Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 10:02:46 +0300 Subject: [PATCH 02/12] modify existing test --- src/helpers/database.py | 1 + tests/unit/test_database.py | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/src/helpers/database.py b/src/helpers/database.py index a6974a9..523618c 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -177,6 +177,7 @@ def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: check_existence_query = f"SELECT * FROM prices WHERE token_address = '\\{token_address[1:]}' and time = '{date_time}' and source = '{source}'" result = self.execute_query(check_existence_query, {}).fetchone() if result is not None: + logger.info("Skipping INSERT operation as Entry already exists in PRICES table.") continue self.execute_and_commit( diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 67fe97c..bc74d9b 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -85,6 +85,12 @@ def tests_write_prices(): 0.000000050569218629, "moralis", ), + ( + "0x68BBED6A47194EFF1CF514B50EA91895597FC91E", + int(datetime.fromisoformat("2024-10-10 16:49:47.000000").timestamp()), + 0.000000050569218629, + "moralis", + ), ] # truncate table with engine.connect() as conn: From e21eaa92b40e2ff36271699900e82c1538b2380f Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 10:04:05 +0300 Subject: [PATCH 03/12] modify info msg --- src/helpers/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/helpers/database.py b/src/helpers/database.py index 523618c..e0f679f 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -177,7 +177,7 @@ def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: check_existence_query = f"SELECT * FROM prices WHERE token_address = '\\{token_address[1:]}' and time = '{date_time}' and source = '{source}'" result = self.execute_query(check_existence_query, {}).fetchone() if result is not None: - logger.info("Skipping INSERT operation as Entry already exists in PRICES table.") + logger.info("Skipping INSERT operation as entry already exists in PRICES table.") continue self.execute_and_commit( From 950dcc1bda0c3eb2f1379258048fa5ab0e4483f9 Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 10:21:34 +0300 Subject: [PATCH 04/12] black fix --- src/helpers/database.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/helpers/database.py b/src/helpers/database.py index e0f679f..a31b8c5 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -177,7 +177,9 @@ def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: check_existence_query = f"SELECT * FROM prices WHERE token_address = '\\{token_address[1:]}' and time = '{date_time}' and source = '{source}'" result = self.execute_query(check_existence_query, {}).fetchone() if result is not None: - logger.info("Skipping INSERT operation as entry already exists in PRICES table.") + logger.info( + "Skipping INSERT operation as entry already exists in PRICES table." + ) continue self.execute_and_commit( From e2f328a37ce99f21dd26b4c77373ad8f84a9d9bf Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 10:33:58 +0300 Subject: [PATCH 05/12] fix test again --- tests/unit/test_database.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index bc74d9b..b40af1b 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -72,6 +72,7 @@ def tests_write_prices(): f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" ) db = Database(engine, "mainnet") + # list contains duplicate entry in order to test how this is handled token_prices = [ ( "0xA0B86991C6218B36C1D19D4A2E9EB0CE3606EB48", @@ -103,6 +104,8 @@ def tests_write_prices(): res = conn.execute( text("SELECT token_address, time, price, source FROM prices") ).all() + # cleaning up the duplicate entry + token_prices = token_prices[:2] for i, (token_address, time, price, source) in enumerate(token_prices): assert HexBytes(res[i][0]) == HexBytes(token_address) assert res[i][1].timestamp() == time From efc12f8090ecff588a5bf2dd8cbc3bd3d0aa2225 Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 15:36:18 +0300 Subject: [PATCH 06/12] address comment --- src/helpers/database.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/helpers/database.py b/src/helpers/database.py index a31b8c5..0fa98d0 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -43,6 +43,18 @@ def execute_and_commit(self, query: str, params: dict): connection.rollback() raise + def get_price(self, token_address, time, source): + query = "SELECT * FROM prices WHERE token_address = :token_address AND time = :time AND source = :source" + result = self.execute_query( + query, + { + "token_address": bytes.fromhex(token_address[2:]), + "time": datetime.fromtimestamp(time, tz=timezone.utc), + "source": source, + }, + ) + return result.fetchall() + def write_token_imbalances( self, tx_hash: str, @@ -173,10 +185,7 @@ def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: ) for token_address, time, price, source in prices: try: - date_time = datetime.fromtimestamp(time, tz=timezone.utc) - check_existence_query = f"SELECT * FROM prices WHERE token_address = '\\{token_address[1:]}' and time = '{date_time}' and source = '{source}'" - result = self.execute_query(check_existence_query, {}).fetchone() - if result is not None: + if self.get_price(token_address, time, source) is not None: logger.info( "Skipping INSERT operation as entry already exists in PRICES table." ) @@ -186,7 +195,7 @@ def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: query, { "token_address": bytes.fromhex(token_address[2:]), - "time": date_time, + "time": datetime.fromtimestamp(time, tz=timezone.utc), "price": price, "source": source, }, From 3288000a755a5d8f481efb4b6d3fafb7e064c64a Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 15:41:42 +0300 Subject: [PATCH 07/12] rearrange get transaction tokens functionality --- src/helpers/blockchain_data.py | 13 ------------- src/transaction_processor.py | 9 +++------ tests/e2e/test_blockchain_data.py | 5 ++++- 3 files changed, 7 insertions(+), 20 deletions(-) diff --git a/src/helpers/blockchain_data.py b/src/helpers/blockchain_data.py index 072e9c4..b4d55f7 100644 --- a/src/helpers/blockchain_data.py +++ b/src/helpers/blockchain_data.py @@ -83,19 +83,6 @@ def get_transaction_timestamp(self, tx_hash: str) -> tuple[str, int]: return tx_hash, timestamp - def get_transaction_tokens(self, tx_hash: str) -> list[tuple[str, str]]: - receipt = self.web3.eth.get_transaction_receipt(HexStr(tx_hash)) - - transfer_topic = self.web3.keccak(text="Transfer(address,address,uint256)") - - token_addresses: set[str] = set() - for log in receipt["logs"]: - if log["topics"] and log["topics"][0] == transfer_topic: - token_address = log["address"] - token_addresses.add(token_address) - - return [(tx_hash, token_address) for token_address in token_addresses] - def get_token_decimals(self, token_address: str) -> int: """Get number of decimals for a token.""" contract = self.web3.eth.contract( diff --git a/src/transaction_processor.py b/src/transaction_processor.py index 47b69e7..a003826 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -118,9 +118,6 @@ def process_single_transaction( """Function processes a single tx to find imbalances, fees, prices including writing to database.""" self.log_message = [] try: - # compute raw token imbalances - token_imbalances = self.process_token_imbalances(tx_hash) - # get transaction timestamp transaction_timestamp = self.blockchain_data.get_transaction_timestamp( tx_hash @@ -128,12 +125,12 @@ def process_single_transaction( # store transaction timestamp self.db.write_transaction_timestamp(transaction_timestamp) - # get transaction tokens - # transaction_tokens = self.blockchain_data.get_transaction_tokens(tx_hash) - # store transaction tokens + # get transaction tokens by first computing raw token imbalances + token_imbalances = self.process_token_imbalances(tx_hash) transaction_tokens = [] for token_address in token_imbalances.keys(): transaction_tokens.append((tx_hash, token_address)) + # store transaction tokens self.db.write_transaction_tokens(transaction_tokens) # update token decimals diff --git a/tests/e2e/test_blockchain_data.py b/tests/e2e/test_blockchain_data.py index 370b33d..61c7607 100644 --- a/tests/e2e/test_blockchain_data.py +++ b/tests/e2e/test_blockchain_data.py @@ -32,7 +32,10 @@ def test_get_transaction_tokens(): blockchain = BlockchainData(web3) tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" - transaction_tokens = blockchain.get_transaction_tokens(tx_hash) + token_imbalances = blockchain.process_token_imbalances(tx_hash) + transaction_tokens = [] + for token_address in token_imbalances.keys(): + transaction_tokens.append((tx_hash, token_address)) assert all(h == tx_hash for h, _ in transaction_tokens) assert set(token_address for _, token_address in transaction_tokens) == { From e284c5ab2d84c3655454f4bfc74f57f87724bc4c Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 15:56:06 +0300 Subject: [PATCH 08/12] Remove get transaction tokens test --- tests/e2e/test_blockchain_data.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/tests/e2e/test_blockchain_data.py b/tests/e2e/test_blockchain_data.py index 61c7607..c5fcb29 100644 --- a/tests/e2e/test_blockchain_data.py +++ b/tests/e2e/test_blockchain_data.py @@ -25,25 +25,3 @@ def test_get_transaction_timestamp(): transaction_timestamp = blockchain.get_transaction_timestamp(tx_hash) assert transaction_timestamp == (tx_hash, 1728044411) - - -def test_get_transaction_tokens(): - web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) - blockchain = BlockchainData(web3) - tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" - - token_imbalances = blockchain.process_token_imbalances(tx_hash) - transaction_tokens = [] - for token_address in token_imbalances.keys(): - transaction_tokens.append((tx_hash, token_address)) - - assert all(h == tx_hash for h, _ in transaction_tokens) - assert set(token_address for _, token_address in transaction_tokens) == { - "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", - "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", - "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee", - "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", - "0xdAC17F958D2ee523a2206206994597C13D831ec7", - "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637", - "0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9", - } From 16a08f25749ede8e0dfc3620ae1ce4c877db5cb6 Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 16:01:56 +0300 Subject: [PATCH 09/12] Add duplicate write prices test --- tests/unit/test_database.py | 45 ++++++++++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index b40af1b..03596ec 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -86,11 +86,44 @@ def tests_write_prices(): 0.000000050569218629, "moralis", ), + ] + # truncate table + with engine.connect() as conn: + conn.execute(text("TRUNCATE prices")) + conn.commit() + # write data + db.write_prices_new(token_prices) + # read data + with engine.connect() as conn: + res = conn.execute( + text("SELECT token_address, time, price, source FROM prices") + ).all() + # cleaning up the duplicate entry + for i, (token_address, time, price, source) in enumerate(token_prices): + assert HexBytes(res[i][0]) == HexBytes(token_address) + assert res[i][1].replace(tzinfo=timezone.utc).timestamp() == time + assert float(res[i][2]) == price + assert res[i][3] == source + + +def tests_write_duplicate_prices(): + engine = create_engine( + f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" + ) + db = Database(engine, "mainnet") + # list contains duplicate entry in order to test how this is handled + token_prices = [ ( - "0x68BBED6A47194EFF1CF514B50EA91895597FC91E", - int(datetime.fromisoformat("2024-10-10 16:49:47.000000").timestamp()), - 0.000000050569218629, - "moralis", + "0xA0B86991C6218B36C1D19D4A2E9EB0CE3606EB48", + int(datetime.fromisoformat("2024-10-10 16:48:47.000000").timestamp()), + 0.000420454193230350, + "coingecko", + ), + ( + "0xA0B86991C6218B36C1D19D4A2E9EB0CE3606EB48", + int(datetime.fromisoformat("2024-10-10 16:48:47.000000").timestamp()), + 0.000420454193230350, + "coingecko", ), ] # truncate table @@ -105,10 +138,10 @@ def tests_write_prices(): text("SELECT token_address, time, price, source FROM prices") ).all() # cleaning up the duplicate entry - token_prices = token_prices[:2] + token_prices = token_prices[:1] for i, (token_address, time, price, source) in enumerate(token_prices): assert HexBytes(res[i][0]) == HexBytes(token_address) - assert res[i][1].timestamp() == time + assert res[i][1].replace(tzinfo=timezone.utc).timestamp() == time assert float(res[i][2]) == price assert res[i][3] == source From 768f08b706b2434494ea4efa3e301d8bbcef2da7 Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 16:03:16 +0300 Subject: [PATCH 10/12] remove comment --- tests/unit/test_database.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 03596ec..a1c7f1c 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -98,7 +98,6 @@ def tests_write_prices(): res = conn.execute( text("SELECT token_address, time, price, source FROM prices") ).all() - # cleaning up the duplicate entry for i, (token_address, time, price, source) in enumerate(token_prices): assert HexBytes(res[i][0]) == HexBytes(token_address) assert res[i][1].replace(tzinfo=timezone.utc).timestamp() == time From bd1d5d5f085ec558b53744e034196b93d55d3b5c Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 16:04:01 +0300 Subject: [PATCH 11/12] remove another comment --- tests/unit/test_database.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index a1c7f1c..730e629 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -72,7 +72,6 @@ def tests_write_prices(): f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" ) db = Database(engine, "mainnet") - # list contains duplicate entry in order to test how this is handled token_prices = [ ( "0xA0B86991C6218B36C1D19D4A2E9EB0CE3606EB48", From 1f3bfe61de7ec78e9c201faa238c4218a6ac24bc Mon Sep 17 00:00:00 2001 From: harisang Date: Wed, 16 Oct 2024 16:09:51 +0300 Subject: [PATCH 12/12] fetchone instead of fetchall in get prices query --- src/helpers/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/helpers/database.py b/src/helpers/database.py index 0fa98d0..7fa9fd7 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -53,7 +53,7 @@ def get_price(self, token_address, time, source): "source": source, }, ) - return result.fetchall() + return result.fetchone() def write_token_imbalances( self,