From a6441a4c7af6aaf2f4c1cbd3e69ea912a8126524 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Tue, 8 Oct 2024 16:39:49 +0200 Subject: [PATCH 01/39] add function to get transaction_token_timestamps data --- src/imbalances_script.py | 19 +++++++++++++++++++ tests/e2e/test_imbalances_script.py | 23 +++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/src/imbalances_script.py b/src/imbalances_script.py index ec91f63..85ffa7d 100644 --- a/src/imbalances_script.py +++ b/src/imbalances_script.py @@ -333,6 +333,25 @@ def compute_imbalances(self, tx_hash: str) -> dict[str, int]: raise +def get_transaction_token_timestamps( + tx_hash: str, web3: Web3 +) -> list[tuple[str, str, int]]: + receipt = web3.eth.get_transaction_receipt(HexStr(tx_hash)) + block_number = receipt["blockNumber"] + block = web3.eth.get_block(block_number) + timestamp = block["timestamp"] + + transfer_topic = web3.keccak(text="Transfer(address,address,uint256)") + + token_addresses: set[str] = set() + for log in receipt["logs"]: + if log["topics"] and log["topics"][0] == transfer_topic: + token_address = log["address"] + token_addresses.add(token_address) + + return [(tx_hash, token_address, timestamp) for token_address in token_addresses] + + def main() -> None: """main function for finding imbalance for a single tx hash.""" tx_hash = input("Enter transaction hash: ") diff --git a/tests/e2e/test_imbalances_script.py b/tests/e2e/test_imbalances_script.py index dfbaa34..7a3554c 100644 --- a/tests/e2e/test_imbalances_script.py +++ b/tests/e2e/test_imbalances_script.py @@ -35,3 +35,26 @@ def tests_process_single_transaction(set_env_variables): "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637": 275548164523, "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE": 0, } + + +def test_get_transaction_token_timestamps(set_env_variables): + from src.imbalances_script import get_transaction_token_timestamps + + web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" + + transaction_token_timestamps = get_transaction_token_timestamps(tx_hash, web3) + + assert all(h == tx_hash for h, _, _ in transaction_token_timestamps) + assert all(t == 1728044411 for _, _, t in transaction_token_timestamps) + assert set( + token_address for _, token_address, _ in transaction_token_timestamps + ) == { + "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", + "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", + "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee", + "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", + "0xdAC17F958D2ee523a2206206994597C13D831ec7", + "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637", + "0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9", + } From 7f94a3b1ad0a770653e23dba0e0071eaf240b83f Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Tue, 8 Oct 2024 17:10:15 +0200 Subject: [PATCH 02/39] adapt getter functions to new tables --- src/imbalances_script.py | 14 +++++++++----- src/transaction_processor.py | 6 +++++- tests/e2e/test_imbalances_script.py | 24 ++++++++++++++++-------- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/imbalances_script.py b/src/imbalances_script.py index 85ffa7d..fccb51d 100644 --- a/src/imbalances_script.py +++ b/src/imbalances_script.py @@ -24,7 +24,7 @@ from web3 import Web3 from web3.datastructures import AttributeDict -from web3.types import TxReceipt +from web3.types import HexStr, TxReceipt from src.helpers.config import CHAIN_RPC_ENDPOINTS, logger from src.constants import ( SETTLEMENT_CONTRACT_ADDRESS, @@ -333,14 +333,18 @@ def compute_imbalances(self, tx_hash: str) -> dict[str, int]: raise -def get_transaction_token_timestamps( - tx_hash: str, web3: Web3 -) -> list[tuple[str, str, int]]: +def get_transaction_timestamp(tx_hash: str, web3: Web3) -> tuple[str, int]: receipt = web3.eth.get_transaction_receipt(HexStr(tx_hash)) block_number = receipt["blockNumber"] block = web3.eth.get_block(block_number) timestamp = block["timestamp"] + return tx_hash, timestamp + + +def get_transaction_tokens(tx_hash: str, web3: Web3) -> list[tuple[str, str]]: + receipt = web3.eth.get_transaction_receipt(HexStr(tx_hash)) + transfer_topic = web3.keccak(text="Transfer(address,address,uint256)") token_addresses: set[str] = set() @@ -349,7 +353,7 @@ def get_transaction_token_timestamps( token_address = log["address"] token_addresses.add(token_address) - return [(tx_hash, token_address, timestamp) for token_address in token_addresses] + return [(tx_hash, token_address) for token_address in token_addresses] def main() -> None: diff --git a/src/transaction_processor.py b/src/transaction_processor.py index c695b92..a226432 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -2,7 +2,11 @@ from web3 import Web3 from src.helpers.blockchain_data import BlockchainData from src.helpers.database import Database -from src.imbalances_script import RawTokenImbalances +from src.imbalances_script import ( + RawTokenImbalances, + get_transaction_timestamp, + get_transaction_tokens, +) from src.price_providers.price_feed import PriceFeed from src.helpers.helper_functions import read_sql_file, set_params from src.helpers.config import CHAIN_SLEEP_TIME, logger diff --git a/tests/e2e/test_imbalances_script.py b/tests/e2e/test_imbalances_script.py index 7a3554c..78d3fbc 100644 --- a/tests/e2e/test_imbalances_script.py +++ b/tests/e2e/test_imbalances_script.py @@ -37,19 +37,27 @@ def tests_process_single_transaction(set_env_variables): } -def test_get_transaction_token_timestamps(set_env_variables): - from src.imbalances_script import get_transaction_token_timestamps +def test_get_transaction_timestamp(set_env_variables): + from src.imbalances_script import get_transaction_timestamp web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" - transaction_token_timestamps = get_transaction_token_timestamps(tx_hash, web3) + transaction_timestamp = get_transaction_timestamp(tx_hash, web3) - assert all(h == tx_hash for h, _, _ in transaction_token_timestamps) - assert all(t == 1728044411 for _, _, t in transaction_token_timestamps) - assert set( - token_address for _, token_address, _ in transaction_token_timestamps - ) == { + assert transaction_timestamp == (tx_hash, 1728044411) + + +def test_get_transaction_tokens(set_env_variables): + from src.imbalances_script import get_transaction_tokens + + web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" + + transaction_tokens = get_transaction_tokens(tx_hash, web3) + + assert all(h == tx_hash for h, _ in transaction_tokens) + assert set(token_address for _, token_address in transaction_tokens) == { "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee", From db155ddd90e59566cb64e9f6941cfa0e255f3039 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Tue, 8 Oct 2024 18:11:16 +0200 Subject: [PATCH 03/39] add functionality for writing to transaction_timestamps --- src/helpers/database.py | 21 ++++++++++++++++++- tests/e2e/test_database.py | 43 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 tests/e2e/test_database.py diff --git a/src/helpers/database.py b/src/helpers/database.py index 23635e2..1212e70 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -1,5 +1,8 @@ -from sqlalchemy import text +from datetime import datetime + +from sqlalchemy import text, func from sqlalchemy.engine import Engine + from src.helpers.config import check_db_connection, logger from src.helpers.helper_functions import read_sql_file from src.constants import NULL_ADDRESS_STRING @@ -124,3 +127,19 @@ def write_fees( "fee_recipient": final_recipient, }, ) + + def write_transaction_timestamp( + self, transaction_timestamp: tuple[str, int] + ) -> None: + """Function writes the transaction timestamp to the table.""" + query = ( + "INSERT INTO transaction_timestamps (tx_hash, time) " + "VALUES (:tx_hash, :time);" + ) + self.execute_and_commit( + query, + { + "tx_hash": bytes.fromhex(transaction_timestamp[0][2:]), + "time": datetime.fromtimestamp(transaction_timestamp[1]), + }, + ) diff --git a/tests/e2e/test_database.py b/tests/e2e/test_database.py new file mode 100644 index 0000000..1c3a640 --- /dev/null +++ b/tests/e2e/test_database.py @@ -0,0 +1,43 @@ +from os import getenv, environ +from unittest.mock import patch + +import pytest +from sqlalchemy import create_engine, text + + +@pytest.fixture() +def set_env_variables(monkeypatch): + with patch.dict(environ, clear=True): + envvars = { + "CHAIN_SLEEP_TIME": "1", + } + for k, v in envvars.items(): + monkeypatch.setenv(k, v) + yield # This is the magical bit which restore the environment after + + +def tests_write_transaction_timestamp(set_env_variables): + # import has to happen after patching environment variable + from src.helpers.database import Database + + engine = create_engine( + f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" + ) + db = Database(engine, "mainnet") + # write data + db.write_transaction_timestamp( + ( + "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c", + 1728044411, + ) + ) + # read data + with engine.connect() as conn: + res = conn.execute( + text("SELECT tx_hash, time FROM transaction_timestamps") + ).one() + assert ( + "0x" + bytes(res[0]).hex() + == "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" + ) + assert res[1].timestamp() == 1728044411 From 57b7398c2a444786e48e8190b030ccb88e390611 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Tue, 8 Oct 2024 18:28:53 +0200 Subject: [PATCH 04/39] add test for writing transaction_tokens --- src/helpers/database.py | 17 +++++++++++++++ tests/{e2e => unit}/test_database.py | 31 ++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) rename tests/{e2e => unit}/test_database.py (53%) diff --git a/src/helpers/database.py b/src/helpers/database.py index 1212e70..024e160 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -143,3 +143,20 @@ def write_transaction_timestamp( "time": datetime.fromtimestamp(transaction_timestamp[1]), }, ) + + def write_transaction_tokens( + self, transaction_tokens: list[tuple[str, str]] + ) -> None: + """Function writes the transaction timestamp to the table.""" + query = ( + "INSERT INTO transaction_tokens (tx_hash, token_address) " + "VALUES (:tx_hash, :token_address);" + ) + for tx_hash, token_address in transaction_tokens: + self.execute_and_commit( + query, + { + "tx_hash": bytes.fromhex(tx_hash[2:]), + "token_address": bytes.fromhex(token_address[2:]), + }, + ) diff --git a/tests/e2e/test_database.py b/tests/unit/test_database.py similarity index 53% rename from tests/e2e/test_database.py rename to tests/unit/test_database.py index 1c3a640..e62561b 100644 --- a/tests/e2e/test_database.py +++ b/tests/unit/test_database.py @@ -1,6 +1,7 @@ from os import getenv, environ from unittest.mock import patch +from hexbytes import HexBytes import pytest from sqlalchemy import create_engine, text @@ -41,3 +42,33 @@ def tests_write_transaction_timestamp(set_env_variables): == "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" ) assert res[1].timestamp() == 1728044411 + + +def tests_write_transaction_tokens(set_env_variables): + # import has to happen after patching environment variable + from src.helpers.database import Database + + engine = create_engine( + f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" + ) + db = Database(engine, "mainnet") + transaction_tokens = [ + ( + "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c", + "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", + ), + ( + "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c", + "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", + ), + ] + # write data + db.write_transaction_tokens(transaction_tokens) + # read data + with engine.connect() as conn: + res = conn.execute( + text("SELECT tx_hash, token_address FROM transaction_tokens") + ).all() + for i, (tx_hash, token_address) in enumerate(transaction_tokens): + assert HexBytes(res[i][0]) == HexBytes(tx_hash) + assert HexBytes(res[i][1]) == HexBytes(token_address) From 29a63b02ff16cd6cc9508c3c9a098852f1ad0a10 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Wed, 9 Oct 2024 11:30:04 +0200 Subject: [PATCH 05/39] add legacy schema to test database --- Dockerfile.test_db | 2 +- database/00_legacy_tables.sql | 36 +++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 database/00_legacy_tables.sql diff --git a/Dockerfile.test_db b/Dockerfile.test_db index 0682417..b8b1064 100644 --- a/Dockerfile.test_db +++ b/Dockerfile.test_db @@ -1,4 +1,4 @@ FROM postgres ENV POSTGRES_PASSWORD=postgres ENV POSTGRES_DB=mainnet -COPY ./database/01_table_creation.sql /docker-entrypoint-initdb.d/ +COPY ./database/* /docker-entrypoint-initdb.d/ diff --git a/database/00_legacy_tables.sql b/database/00_legacy_tables.sql new file mode 100644 index 0000000..67e82b5 --- /dev/null +++ b/database/00_legacy_tables.sql @@ -0,0 +1,36 @@ +-- Database Schema for Token Imbalances and Slippage + +-- Table: raw_token_imbalances (for storing raw token imbalances) +CREATE TABLE raw_token_imbalances ( + auction_id BIGINT NOT NULL, + chain_name VARCHAR(50) NOT NULL, + block_number BIGINT NOT NULL, + tx_hash BYTEA NOT NULL, + token_address BYTEA NOT NULL, + imbalance NUMERIC(78,0), + PRIMARY KEY (tx_hash, token_address) +); + +-- Table: slippage_prices (for storing per unit token prices in ETH) +CREATE TABLE slippage_prices ( + chain_name VARCHAR(50) NOT NULL, + source VARCHAR(50) NOT NULL, + block_number BIGINT NOT NULL, + tx_hash BYTEA NOT NULL, + token_address BYTEA NOT NULL, + price NUMERIC(42,18), + PRIMARY KEY (tx_hash, token_address) +); + +-- Table: Stores fees (i.e. protocol fee, network fee on per token basis) +CREATE TABLE fees_new ( + chain_name VARCHAR(50) NOT NULL, + auction_id BIGINT NOT NULL, + block_number BIGINT NOT NULL, + tx_hash BYTEA NOT NULL, + order_uid BYTEA NOT NULL, + token_address BYTEA NOT NULL, + fee_amount NUMERIC(78,0) NOT NULL, + fee_type VARCHAR(50) NOT NULL, -- e.g. "protocol" or "network" + PRIMARY KEY (tx_hash, order_uid, token_address, fee_type) +); From 5b0a4617f2e1f90b13e9df86a82ee7db1cde3fe8 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Wed, 9 Oct 2024 11:31:07 +0200 Subject: [PATCH 06/39] add functionality to get latest transaction from database --- src/helpers/database.py | 15 ++++++++++- src/transaction_processor.py | 52 +++++++++++++++++++++++------------- tests/unit/test_database.py | 51 +++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 19 deletions(-) diff --git a/src/helpers/database.py b/src/helpers/database.py index 024e160..37bbcb3 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -1,6 +1,7 @@ from datetime import datetime -from sqlalchemy import text, func +from hexbytes import HexBytes +from sqlalchemy import text from sqlalchemy.engine import Engine from src.helpers.config import check_db_connection, logger @@ -160,3 +161,15 @@ def write_transaction_tokens( "token_address": bytes.fromhex(token_address[2:]), }, ) + + def get_latest_transaction(self) -> str | None: + """Get latest transaction hash. + If no transaction is found, return None.""" + query = "SELECT tx_hash FROM transaction_timestamps ORDER BY time DESC LIMIT 1;" + result = self.execute_query(query, {}).fetchone() + + if result is None: + return None + + latest_tx_hash = HexBytes(result[0]).to_0x_hex() + return latest_tx_hash diff --git a/src/transaction_processor.py b/src/transaction_processor.py index a226432..a96dabd 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -47,25 +47,41 @@ def get_start_block(self) -> int: If no entries are present, fallback to get_finalized_block_number(). """ try: - # Query for the maximum block number - query_max_block = read_sql_file("src/sql/select_max_block.sql") - result = self.db.execute_query( - query_max_block, {"chain_name": self.chain_name} - ) - row = result.fetchone() - max_block = row[0] if row is not None else None - blockchain_latest_block = self.blockchain_data.get_latest_block() - - # If no entries present, fallback to get_latest_block() - if max_block is None: - return blockchain_latest_block - - logger.info("Fetched max block number from database: %d", max_block) - if max_block > blockchain_latest_block - 7200: - return max_block + 1 + # 1) get last transaction from DB + latest_tx_hash = self.db.get_latest_transaction() + # 2) get block of that transaction + if latest_tx_hash: + block_number = int( + self.blockchain_data.web3.eth.get_transaction_receipt( + HexBytes(latest_tx_hash) + )["blockNumber"] + ) else: - # TODO: Remove this rule before moving to production. - return blockchain_latest_block + logger.warning( + f"No transaction found in database. Using recent block instead." + ) + block_number = self.blockchain_data.get_latest_block() + return block_number + 1 + + # # Query for the maximum block number + # query_max_block = read_sql_file("src/sql/select_max_block.sql") + # result = self.db.execute_query( + # query_max_block, {"chain_name": self.chain_name} + # ) + # row = result.fetchone() + # max_block = row[0] if row is not None else None + # blockchain_latest_block = self.blockchain_data.get_latest_block() + # + # # If no entries present, fallback to get_latest_block() + # if max_block is None: + # return blockchain_latest_block + # + # logger.info("Fetched max block number from database: %d", max_block) + # if max_block > blockchain_latest_block - 7200: + # return max_block + 1 + # else: + # # TODO: Remove this rule before moving to production. + # return blockchain_latest_block except Exception as e: logger.error("Error fetching start block from database: %s", e) raise diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index e62561b..392bb2c 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -25,6 +25,10 @@ def tests_write_transaction_timestamp(set_env_variables): f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" ) db = Database(engine, "mainnet") + # truncate table + with engine.connect() as conn: + conn.execute(text("TRUNCATE transaction_timestamps")) + conn.commit() # write data db.write_transaction_timestamp( ( @@ -62,6 +66,10 @@ def tests_write_transaction_tokens(set_env_variables): "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", ), ] + # truncate table + with engine.connect() as conn: + conn.execute(text("TRUNCATE transaction_tokens")) + conn.commit() # write data db.write_transaction_tokens(transaction_tokens) # read data @@ -72,3 +80,46 @@ def tests_write_transaction_tokens(set_env_variables): for i, (tx_hash, token_address) in enumerate(transaction_tokens): assert HexBytes(res[i][0]) == HexBytes(tx_hash) assert HexBytes(res[i][1]) == HexBytes(token_address) + + +def test_get_latest_transaction(set_env_variables): + # import has to happen after patching environment variable + from src.helpers.database import Database + + engine = create_engine( + f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" + ) + db = Database(engine, "mainnet") + # truncate table + with engine.connect() as conn: + conn.execute(text("TRUNCATE transaction_timestamps")) + conn.commit() + # check that empty table returns None + assert db.get_latest_transaction() is None + # write data + db.write_transaction_timestamp( + ( + "0x99F10B2DE2B04DFC729B6C46FC5510C44424C213106ED77C80691FA0DD08F3CF", + 1728459935, + ) + ) + db.write_transaction_timestamp( + ( + "0xDFBB14E8F0E47FFC105A16043B2ECF536B323AC3B3B1D319A2D635E392E75BB9", + 1728459995, # latest time stamp + ) + ) + db.write_transaction_timestamp( + ( + "0xF153C9EF2D54C656182B9BD0484B4C1C1A317781656EAF615FA0A92D7C3AFDF7", + 1728459959, + ) + ) + # read data + tx_hash = db.get_latest_transaction() + assert ( + tx_hash + == HexBytes( + "0xDFBB14E8F0E47FFC105A16043B2ECF536B323AC3B3B1D319A2D635E392E75BB9" + ).to_0x_hex() + ) From 1e9f327ce1fe61fa1071663d2fcd2ca50553f63c Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Wed, 9 Oct 2024 11:31:51 +0200 Subject: [PATCH 07/39] adapt daemon to store timestamps and tokens --- src/transaction_processor.py | 45 ++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/src/transaction_processor.py b/src/transaction_processor.py index a96dabd..951a15c 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -123,6 +123,30 @@ def process_single_transaction( """Function processes a single tx to find imbalances, fees, prices including writing to database.""" self.log_message = [] try: + # get transaction timestamp + transaction_timestamp = get_transaction_timestamp( + tx_hash, self.blockchain_data.web3 + ) + # store transaction timestamp + self.db.write_transaction_timestamp(transaction_timestamp) + + # get transaction tokens + transaction_tokens = get_transaction_tokens( + tx_hash, self.blockchain_data.web3 + ) + # store transaction tokens + self.db.write_transaction_tokens(transaction_tokens) + + # get token decimals + + # store token decimals + + # get prices + prices = self.process_prices_for_tokens( + token_imbalances, block_number, tx_hash + ) + # store prices + # Compute Raw Token Imbalances if self.process_imbalances: token_imbalances = self.process_token_imbalances( @@ -208,6 +232,27 @@ def process_fees_for_transaction( logger.error(f"Failed to process fees for transaction {tx_hash}: {e}") return {}, {}, {} + def get_prices_for_tokens( + self, + transaction_timestamp: tuple[str, int], + transaction_tokens: list[tuple[str, str]], + ) -> dict[str, tuple[float, str]]: + """Fetch prices for all transferred tokens.""" + prices = {} + token_addresses = [token_address for _, token_address in transaction_tokens] + try: + for token_address in token_addresses: + price_data = self.price_providers.get_price( + set_params(token_address, block_number, tx_hash) + ) + if price_data: + price, source = price_data + prices[token_address] = (price, source) + except Exception as e: + logger.error(f"Failed to process prices for transaction {tx_hash}: {e}") + + return prices + def process_prices_for_tokens( self, token_imbalances: dict[str, int], From 2e281d06970564be5d5976591f91d7afe7e6353e Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Wed, 9 Oct 2024 16:47:51 +0200 Subject: [PATCH 08/39] add token decimals function to blockchain functions --- src/helpers/blockchain_data.py | 10 ++++++++++ src/price_providers/endpoint_auction_pricing.py | 11 +++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/helpers/blockchain_data.py b/src/helpers/blockchain_data.py index df6028d..31fda76 100644 --- a/src/helpers/blockchain_data.py +++ b/src/helpers/blockchain_data.py @@ -1,5 +1,7 @@ from hexbytes import HexBytes from web3 import Web3 + +from contracts.erc20_abi import erc20_abi from src.helpers.config import logger from src.constants import SETTLEMENT_CONTRACT_ADDRESS, INVALIDATED_ORDER_TOPIC @@ -71,3 +73,11 @@ def get_auction_id(self, tx_hash: str) -> int: # convert bytes to int auction_id = int.from_bytes(call_data_bytes[-8:], byteorder="big") return auction_id + + +def get_token_decimals(token_address: str, web3: Web3) -> int: + """Get number of decimals for a token.""" + contract = web3.eth.contract( + address=Web3.to_checksum_address(token_address), abi=erc20_abi + ) + return contract.functions.decimals().call() diff --git a/src/price_providers/endpoint_auction_pricing.py b/src/price_providers/endpoint_auction_pricing.py index f525bee..f84334e 100644 --- a/src/price_providers/endpoint_auction_pricing.py +++ b/src/price_providers/endpoint_auction_pricing.py @@ -1,13 +1,16 @@ import requests + from src.price_providers.pricing_model import AbstractPriceProvider -from src.helpers.config import logger -from src.helpers.helper_functions import extract_params, get_token_decimals +from src.helpers.blockchain_data import get_token_decimals +from src.helpers.config import get_web3_instance, logger +from src.helpers.helper_functions import extract_params class AuctionPriceProvider(AbstractPriceProvider): """Fetch auction prices.""" def __init__(self) -> None: + self.web3 = get_web3_instance() self.endpoint_urls = { "prod": f"https://api.cow.fi/mainnet/api/v1/solver_competition/by_tx_hash/", "barn": f"https://barn.api.cow.fi/mainnet/api/v1/solver_competition/by_tx_hash/", @@ -15,7 +18,7 @@ def __init__(self) -> None: @property def name(self) -> str: - return "AuctionPrices" + return "native" def get_price(self, price_params: dict) -> float | None: """Function returns Auction price from endpoint for a token address.""" @@ -39,7 +42,7 @@ def get_price(self, price_params: dict) -> float | None: return None # calculation for converting auction price from endpoint to ETH equivalent per token unit price_in_eth = (float(price) / 10**18) * ( - 10 ** get_token_decimals(token_address) / 10**18 + 10 ** get_token_decimals(token_address, self.web3) / 10**18 ) return price_in_eth From 39ca28077f037870d71afe368f3c85dd63c70a70 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Wed, 9 Oct 2024 16:49:09 +0200 Subject: [PATCH 09/39] add functionality to daemon --- src/helpers/database.py | 17 +++++++++++++++++ src/transaction_processor.py | 24 ++++++++++++++++++------ 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/src/helpers/database.py b/src/helpers/database.py index 37bbcb3..70cf2c5 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -162,6 +162,23 @@ def write_transaction_tokens( }, ) + def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: + """Write prices to database.""" + query = ( + "INSERT INTO prices (token_address, time, price, source) " + "VALUES (:token_address, :time, :price, :source);" + ) + for token_address, time, price, source in prices: + self.execute_and_commit( + query, + { + "token_address": bytes.fromhex(token_address[2:]), + "time": datetime.fromtimestamp(time), + "price": price, + "source": source, + }, + ) + def get_latest_transaction(self) -> str | None: """Get latest transaction hash. If no transaction is found, return None.""" diff --git a/src/transaction_processor.py b/src/transaction_processor.py index 951a15c..0b6292d 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -142,10 +142,11 @@ def process_single_transaction( # store token decimals # get prices - prices = self.process_prices_for_tokens( - token_imbalances, block_number, tx_hash + prices_new = self.get_prices_for_tokens( + transaction_timestamp, transaction_tokens ) # store prices + self.db.write_prices_new(prices_new) # Compute Raw Token Imbalances if self.process_imbalances: @@ -236,18 +237,29 @@ def get_prices_for_tokens( self, transaction_timestamp: tuple[str, int], transaction_tokens: list[tuple[str, str]], - ) -> dict[str, tuple[float, str]]: + ) -> list[tuple[str, int, float, str]]: """Fetch prices for all transferred tokens.""" - prices = {} + prices: list[tuple[str, int, float, str]] = [] + tx_hash = transaction_timestamp[0] + timestamp = transaction_timestamp[1] token_addresses = [token_address for _, token_address in transaction_tokens] + block_number = self.blockchain_data.web3.eth.get_transaction_receipt( + HexBytes(tx_hash) + )["blockNumber"] try: for token_address in token_addresses: price_data = self.price_providers.get_price( set_params(token_address, block_number, tx_hash) ) if price_data: - price, source = price_data - prices[token_address] = (price, source) + prices.append( + (token_address, timestamp, price_data[0], price_data[1]) + ) + else: + logger.warning( + f"Failed to fetch price for token {token_address} and" + f"transaction {tx_hash}." + ) except Exception as e: logger.error(f"Failed to process prices for transaction {tx_hash}: {e}") From 9a6f6df8d6c01107e04dca6cef1e6578aece4ce7 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Wed, 9 Oct 2024 16:50:01 +0200 Subject: [PATCH 10/39] rename price feeds to lower case --- src/price_providers/coingecko_pricing.py | 2 +- src/price_providers/dune_pricing.py | 2 +- src/price_providers/moralis_pricing.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/price_providers/coingecko_pricing.py b/src/price_providers/coingecko_pricing.py index f296180..9933e69 100644 --- a/src/price_providers/coingecko_pricing.py +++ b/src/price_providers/coingecko_pricing.py @@ -29,7 +29,7 @@ def __init__(self) -> None: @property def name(self) -> str: - return "Coingecko" + return "coingecko" def fetch_coingecko_list(self) -> list[dict]: """ diff --git a/src/price_providers/dune_pricing.py b/src/price_providers/dune_pricing.py index 5954ec0..39c36f6 100644 --- a/src/price_providers/dune_pricing.py +++ b/src/price_providers/dune_pricing.py @@ -23,7 +23,7 @@ def __init__(self) -> None: @property def name(self) -> str: - return "Dune" + return "dune" def initialize_dune_client(self) -> DuneClient | None: """ diff --git a/src/price_providers/moralis_pricing.py b/src/price_providers/moralis_pricing.py index 9cdd968..c4e778b 100644 --- a/src/price_providers/moralis_pricing.py +++ b/src/price_providers/moralis_pricing.py @@ -18,7 +18,7 @@ def __init__(self) -> None: @property def name(self) -> str: - return "Moralis" + return "moralis" @staticmethod def wei_to_eth(price: str) -> float | None: From 4ea04079bf5c14d723874a44d8c5e57ae70e7056 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Wed, 9 Oct 2024 16:50:37 +0200 Subject: [PATCH 11/39] skip native price when fetching prices --- src/price_providers/price_feed.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/price_providers/price_feed.py b/src/price_providers/price_feed.py index afed5ee..412af3a 100644 --- a/src/price_providers/price_feed.py +++ b/src/price_providers/price_feed.py @@ -1,9 +1,13 @@ +from web3.types import HexStr + from src.price_providers.coingecko_pricing import CoingeckoPriceProvider from src.price_providers.dune_pricing import DunePriceProvider from src.price_providers.moralis_pricing import MoralisPriceProvider from src.price_providers.endpoint_auction_pricing import AuctionPriceProvider from src.helpers.config import logger +NATIVE_TOKEN = HexStr("0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE") + class PriceFeed: """Class encapsulating the different price providers.""" @@ -22,7 +26,11 @@ def get_price(self, price_params: dict) -> tuple[float, str] | None: """Function iterates over list of price provider objects and attempts to get a price.""" for provider in self.providers: try: - price = provider.get_price(price_params) + price: float | None = None + if HexStr(price_params["token_address"]) == NATIVE_TOKEN: + price = 1.0 + else: + price = provider.get_price(price_params) if price is not None: return price, provider.name except Exception as e: From a014562cd1bb1432dff0baba1c8539fdc4c2ef5a Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Wed, 9 Oct 2024 16:50:50 +0200 Subject: [PATCH 12/39] reorder tests --- tests/e2e/test_imbalances_script.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/e2e/test_imbalances_script.py b/tests/e2e/test_imbalances_script.py index db49b38..91f3c20 100644 --- a/tests/e2e/test_imbalances_script.py +++ b/tests/e2e/test_imbalances_script.py @@ -2,7 +2,11 @@ from web3 import Web3 -from src.imbalances_script import RawTokenImbalances +from src.imbalances_script import ( + RawTokenImbalances, + get_transaction_timestamp, + get_transaction_tokens, +) def tests_process_single_transaction(): @@ -23,9 +27,7 @@ def tests_process_single_transaction(): } -def test_get_transaction_timestamp(set_env_variables): - from src.imbalances_script import get_transaction_timestamp - +def test_get_transaction_timestamp(): web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" @@ -34,9 +36,7 @@ def test_get_transaction_timestamp(set_env_variables): assert transaction_timestamp == (tx_hash, 1728044411) -def test_get_transaction_tokens(set_env_variables): - from src.imbalances_script import get_transaction_tokens - +def test_get_transaction_tokens(): web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" From cf41752fdcd9e124826fc528ca099bc9f406e7b7 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Wed, 9 Oct 2024 16:53:27 +0200 Subject: [PATCH 13/39] add test database to CI --- .github/workflows/pull_request.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index e14548a..7b79f67 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -6,6 +6,23 @@ on: jobs: python: runs-on: ubuntu-latest + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + POSTGRES_DB: mainnet + # Set health checks to wait until postgres has started + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + # Copy repo contents into container (needed to populate DB) + volumes: + - ${{ github.workspace }}/database:/docker-entrypoint-initdb.d steps: - uses: actions/checkout@v3 - name: Setup Python 3.12 @@ -29,4 +46,5 @@ jobs: run: pytest tests/e2e/test_blockchain_data.py tests/e2e/test_imbalances_script.py env: NODE_URL: ${{ secrets.NODE_URL }} + SOLVER_SLIPPAGE_DB_URL: postgres:postgres@localhost:5432/mainnet CHAIN_SLEEP_TIME: 1 From df1118710e66c9649de3f307a0deabe69736436e Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Wed, 9 Oct 2024 17:05:44 +0200 Subject: [PATCH 14/39] minor linting changes --- src/price_providers/moralis_pricing.py | 7 +++++-- src/transaction_processor.py | 14 ++++++++++---- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/price_providers/moralis_pricing.py b/src/price_providers/moralis_pricing.py index c4e778b..bc4bb57 100644 --- a/src/price_providers/moralis_pricing.py +++ b/src/price_providers/moralis_pricing.py @@ -1,11 +1,14 @@ +import os + +from dotenv import load_dotenv from moralis import evm_api + from src.helpers.config import get_logger -import os, dotenv from src.price_providers.pricing_model import AbstractPriceProvider from src.helpers.helper_functions import extract_params -dotenv.load_dotenv() +load_dotenv() class MoralisPriceProvider(AbstractPriceProvider): diff --git a/src/transaction_processor.py b/src/transaction_processor.py index 0b6292d..b1b65e9 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -1,22 +1,28 @@ +import time + from hexbytes import HexBytes from web3 import Web3 + +from src.fees.compute_fees import compute_all_fees_of_batch from src.helpers.blockchain_data import BlockchainData +from src.helpers.config import CHAIN_SLEEP_TIME, logger from src.helpers.database import Database +from src.helpers.helper_functions import read_sql_file, set_params from src.imbalances_script import ( RawTokenImbalances, get_transaction_timestamp, get_transaction_tokens, ) from src.price_providers.price_feed import PriceFeed -from src.helpers.helper_functions import read_sql_file, set_params -from src.helpers.config import CHAIN_SLEEP_TIME, logger -from src.fees.compute_fees import compute_all_fees_of_batch -import time + +# pylint: disable=logging-fstring-interpolation class TransactionProcessor: """Class processes transactions for the slippage project.""" + # pylint: disable=too-many-instance-attributes, too-many-arguments + def __init__( self, blockchain_data: BlockchainData, From 467b5ed35b0af56dda3a831c3670d3b5f7858e7f Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 11:53:01 +0200 Subject: [PATCH 15/39] move blockchain functionality into blockchain function --- src/helpers/blockchain_data.py | 24 +++++++++++++++++++++ src/imbalances_script.py | 23 -------------------- tests/e2e/test_blockchain_data.py | 33 ++++++++++++++++++++++++++++- tests/e2e/test_imbalances_script.py | 33 +---------------------------- 4 files changed, 57 insertions(+), 56 deletions(-) diff --git a/src/helpers/blockchain_data.py b/src/helpers/blockchain_data.py index 31fda76..78ce94e 100644 --- a/src/helpers/blockchain_data.py +++ b/src/helpers/blockchain_data.py @@ -1,5 +1,6 @@ from hexbytes import HexBytes from web3 import Web3 +from web3.types import HexStr from contracts.erc20_abi import erc20_abi from src.helpers.config import logger @@ -75,6 +76,29 @@ def get_auction_id(self, tx_hash: str) -> int: return auction_id +def get_transaction_timestamp(tx_hash: str, web3: Web3) -> tuple[str, int]: + receipt = web3.eth.get_transaction_receipt(HexStr(tx_hash)) + block_number = receipt["blockNumber"] + block = web3.eth.get_block(block_number) + timestamp = block["timestamp"] + + return tx_hash, timestamp + + +def get_transaction_tokens(tx_hash: str, web3: Web3) -> list[tuple[str, str]]: + receipt = web3.eth.get_transaction_receipt(HexStr(tx_hash)) + + transfer_topic = web3.keccak(text="Transfer(address,address,uint256)") + + token_addresses: set[str] = set() + for log in receipt["logs"]: + if log["topics"] and log["topics"][0] == transfer_topic: + token_address = log["address"] + token_addresses.add(token_address) + + return [(tx_hash, token_address) for token_address in token_addresses] + + def get_token_decimals(token_address: str, web3: Web3) -> int: """Get number of decimals for a token.""" contract = web3.eth.contract( diff --git a/src/imbalances_script.py b/src/imbalances_script.py index fccb51d..baf276a 100644 --- a/src/imbalances_script.py +++ b/src/imbalances_script.py @@ -333,29 +333,6 @@ def compute_imbalances(self, tx_hash: str) -> dict[str, int]: raise -def get_transaction_timestamp(tx_hash: str, web3: Web3) -> tuple[str, int]: - receipt = web3.eth.get_transaction_receipt(HexStr(tx_hash)) - block_number = receipt["blockNumber"] - block = web3.eth.get_block(block_number) - timestamp = block["timestamp"] - - return tx_hash, timestamp - - -def get_transaction_tokens(tx_hash: str, web3: Web3) -> list[tuple[str, str]]: - receipt = web3.eth.get_transaction_receipt(HexStr(tx_hash)) - - transfer_topic = web3.keccak(text="Transfer(address,address,uint256)") - - token_addresses: set[str] = set() - for log in receipt["logs"]: - if log["topics"] and log["topics"][0] == transfer_topic: - token_address = log["address"] - token_addresses.add(token_address) - - return [(tx_hash, token_address) for token_address in token_addresses] - - def main() -> None: """main function for finding imbalance for a single tx hash.""" tx_hash = input("Enter transaction hash: ") diff --git a/tests/e2e/test_blockchain_data.py b/tests/e2e/test_blockchain_data.py index 0c21f64..49e18bf 100644 --- a/tests/e2e/test_blockchain_data.py +++ b/tests/e2e/test_blockchain_data.py @@ -2,7 +2,11 @@ from web3 import Web3 -from src.helpers.blockchain_data import BlockchainData +from src.helpers.blockchain_data import ( + BlockchainData, + get_transaction_timestamp, + get_transaction_tokens, +) def tests_get_tx_hashes_blocks(): @@ -15,3 +19,30 @@ def tests_get_tx_hashes_blocks(): "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c", 20892118, ) + + +def test_get_transaction_timestamp(): + web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" + + transaction_timestamp = get_transaction_timestamp(tx_hash, web3) + + assert transaction_timestamp == (tx_hash, 1728044411) + + +def test_get_transaction_tokens(): + web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" + + transaction_tokens = get_transaction_tokens(tx_hash, web3) + + assert all(h == tx_hash for h, _ in transaction_tokens) + assert set(token_address for _, token_address in transaction_tokens) == { + "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", + "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", + "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee", + "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", + "0xdAC17F958D2ee523a2206206994597C13D831ec7", + "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637", + "0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9", + } diff --git a/tests/e2e/test_imbalances_script.py b/tests/e2e/test_imbalances_script.py index 91f3c20..ba0506d 100644 --- a/tests/e2e/test_imbalances_script.py +++ b/tests/e2e/test_imbalances_script.py @@ -2,11 +2,7 @@ from web3 import Web3 -from src.imbalances_script import ( - RawTokenImbalances, - get_transaction_timestamp, - get_transaction_tokens, -) +from src.imbalances_script import RawTokenImbalances def tests_process_single_transaction(): @@ -25,30 +21,3 @@ def tests_process_single_transaction(): "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637": 275548164523, "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE": 0, } - - -def test_get_transaction_timestamp(): - web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) - tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" - - transaction_timestamp = get_transaction_timestamp(tx_hash, web3) - - assert transaction_timestamp == (tx_hash, 1728044411) - - -def test_get_transaction_tokens(): - web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) - tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" - - transaction_tokens = get_transaction_tokens(tx_hash, web3) - - assert all(h == tx_hash for h, _ in transaction_tokens) - assert set(token_address for _, token_address in transaction_tokens) == { - "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", - "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", - "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee", - "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", - "0xdAC17F958D2ee523a2206206994597C13D831ec7", - "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637", - "0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9", - } From 776fea47a1911b947cbc11c63e7f606637f91842 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 12:31:28 +0200 Subject: [PATCH 16/39] update psycopg --- requirements.in | 3 +-- requirements.txt | 7 +++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/requirements.in b/requirements.in index 4030533..42f0597 100644 --- a/requirements.in +++ b/requirements.in @@ -2,10 +2,9 @@ dune-client moralis pandas pandas-stubs -psycopg2 +psycopg python-dotenv requests -types-psycopg2 types-requests SQLAlchemy web3 diff --git a/requirements.txt b/requirements.txt index c8f59cb..ccd95c2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with Python 3.12 # by the following command: # -# pip-compile requirements.in +# pip-compile # aiohappyeyeballs==2.4.0 # via aiohttp @@ -139,7 +139,7 @@ platformdirs==4.3.6 # pylint pluggy==1.5.0 # via pytest -psycopg2==2.9.9 +psycopg==3.2.3 # via -r requirements.in pycryptodome==3.20.0 # via @@ -187,8 +187,6 @@ toolz==0.12.1 # via cytoolz types-deprecated==1.2.9.20240311 # via dune-client -types-psycopg2==2.9.21.20240819 - # via -r requirements.in types-python-dateutil==2.9.0.20240906 # via dune-client types-pytz==2024.2.0.20240913 @@ -207,6 +205,7 @@ typing-extensions==4.12.2 # eth-typing # moralis # mypy + # psycopg # pydantic # pydantic-core # sqlalchemy From e7511ce3a8506e9671ce349a82614f4701efc0e9 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 13:23:18 +0200 Subject: [PATCH 17/39] fetch decimals --- src/helpers/config.py | 2 +- src/helpers/database.py | 36 +++++++++++++++++++++++++++++++++++- src/transaction_processor.py | 18 +++++++++--------- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/helpers/config.py b/src/helpers/config.py index 93ea485..5715635 100644 --- a/src/helpers/config.py +++ b/src/helpers/config.py @@ -51,7 +51,7 @@ def create_db_connection(db_type: str) -> Engine: if not db_url: raise ValueError(f"{db_type} database URL not found in environment variables.") - return create_engine(f"postgresql+psycopg2://{db_url}") + return create_engine(f"postgresql+psycopg://{db_url}") def check_db_connection(engine: Engine, db_type: str) -> Engine: diff --git a/src/helpers/database.py b/src/helpers/database.py index 70cf2c5..e7aa608 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -1,7 +1,7 @@ from datetime import datetime from hexbytes import HexBytes -from sqlalchemy import text +from sqlalchemy import text, insert, Table, Column, Integer, LargeBinary, MetaData from sqlalchemy.engine import Engine from src.helpers.config import check_db_connection, logger @@ -190,3 +190,37 @@ def get_latest_transaction(self) -> str | None: latest_tx_hash = HexBytes(result[0]).to_0x_hex() return latest_tx_hash + + def get_tokens_without_decimals(self) -> list[str]: + """Get tokens without decimals.""" + query = ( + "SELECT token_address FROM transaction_tokens " + "WHERE token_address not in (SELECT token_address FROM token_decimals);" + ) + result = self.execute_query(query, {}).fetchall() + return list({HexBytes(row[0]).to_0x_hex() for row in result}) + + def write_token_decimals(self, token_decimals: list[tuple[str, int]]) -> None: + self.engine = check_db_connection(self.engine, "solver_slippage") + + # Define the table without creating a model class + token_decimals_table = Table( + "token_decimals", + MetaData(), + Column("token_address", LargeBinary, primary_key=True), + Column("decimals", Integer, nullable=False), + ) + + # Prepare the data + values = [ + {"token_address": bytes.fromhex(token_address[2:]), "decimals": decimals} + for token_address, decimals in token_decimals + ] + + # Create the insert statement + stmt = insert(token_decimals_table).values(values) + + # Execute the bulk insert + with self.engine.connect() as conn: + conn.execute(stmt) + conn.commit() diff --git a/src/transaction_processor.py b/src/transaction_processor.py index b1b65e9..d85b01f 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -4,16 +4,17 @@ from web3 import Web3 from src.fees.compute_fees import compute_all_fees_of_batch -from src.helpers.blockchain_data import BlockchainData -from src.helpers.config import CHAIN_SLEEP_TIME, logger -from src.helpers.database import Database -from src.helpers.helper_functions import read_sql_file, set_params -from src.imbalances_script import ( - RawTokenImbalances, +from src.helpers.blockchain_data import ( + BlockchainData, get_transaction_timestamp, get_transaction_tokens, ) +from src.helpers.config import CHAIN_SLEEP_TIME, logger +from src.helpers.database import Database +from src.helpers.helper_functions import read_sql_file, set_params +from src.imbalances_script import RawTokenImbalances from src.price_providers.price_feed import PriceFeed +from src.token_decimals import update_token_decimals # pylint: disable=logging-fstring-interpolation @@ -143,9 +144,8 @@ def process_single_transaction( # store transaction tokens self.db.write_transaction_tokens(transaction_tokens) - # get token decimals - - # store token decimals + # update token decimals + update_token_decimals(self.db.engine, self.blockchain_data.web3) # get prices prices_new = self.get_prices_for_tokens( From 9dcdda1e966c0f86da330ba9b7c817b8da053682 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 13:24:25 +0200 Subject: [PATCH 18/39] additional file --- src/token_decimals.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 src/token_decimals.py diff --git a/src/token_decimals.py b/src/token_decimals.py new file mode 100644 index 0000000..3675a74 --- /dev/null +++ b/src/token_decimals.py @@ -0,0 +1,31 @@ +from os import getenv + +from dotenv import load_dotenv +from sqlalchemy import create_engine +from web3 import Web3 + +from src.helpers.blockchain_data import get_token_decimals +from src.helpers.database import Database + + +load_dotenv() + + +def update_token_decimals(engine, web3) -> None: + db = Database(engine, "mainnet") + + token_addresses = db.get_tokens_without_decimals() + + token_decimals = [ + (token_address, get_token_decimals(token_address, web3)) + for token_address in token_addresses + ] + if token_decimals: + db.write_token_decimals(token_decimals) + + +if __name__ == "__main__": + engine = create_engine(f"postgresql+psycopg2://{getenv('SOLVER_SLIPPAGE_DB_URL')}") + web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + + update_token_decimals(engine, web3) From 93b86b757c4e9a4103f41939e0dc16521a31a65e Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 14:11:41 +0200 Subject: [PATCH 19/39] use larger number for price in database --- database/01_table_creation.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database/01_table_creation.sql b/database/01_table_creation.sql index b7c4255..0bb9476 100644 --- a/database/01_table_creation.sql +++ b/database/01_table_creation.sql @@ -15,7 +15,7 @@ CREATE TYPE PriceSource AS ENUM ('coingecko', 'moralis', 'dune', 'native'); CREATE TABLE prices ( token_address bytea NOT NULL, time timestamp NOT NULL, - price numeric(60, 18) NOT NULL, + price numeric(78, 18) NOT NULL, source PriceSource NOT NULL, PRIMARY KEY (token_address, time, source) From ad29adb37ed73ba85b0270246f39a1dff022158b Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 15:47:26 +0200 Subject: [PATCH 20/39] reorganize tests --- .github/workflows/pull_request.yaml | 2 +- tests/{ => legacy}/basic_test.py | 0 tests/{ => legacy}/compare_imbalances.py | 0 tests/{e2e => legacy}/test_fees.py | 0 tests/unit/__init__.py | 0 5 files changed, 1 insertion(+), 1 deletion(-) rename tests/{ => legacy}/basic_test.py (100%) rename tests/{ => legacy}/compare_imbalances.py (100%) rename tests/{e2e => legacy}/test_fees.py (100%) create mode 100644 tests/unit/__init__.py diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index 7b79f67..45fcf82 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -43,7 +43,7 @@ jobs: - name: Type Check (mypy) run: mypy src - name: Tests - run: pytest tests/e2e/test_blockchain_data.py tests/e2e/test_imbalances_script.py + run: pytest tests/unit/ tests/e2e/ env: NODE_URL: ${{ secrets.NODE_URL }} SOLVER_SLIPPAGE_DB_URL: postgres:postgres@localhost:5432/mainnet diff --git a/tests/basic_test.py b/tests/legacy/basic_test.py similarity index 100% rename from tests/basic_test.py rename to tests/legacy/basic_test.py diff --git a/tests/compare_imbalances.py b/tests/legacy/compare_imbalances.py similarity index 100% rename from tests/compare_imbalances.py rename to tests/legacy/compare_imbalances.py diff --git a/tests/e2e/test_fees.py b/tests/legacy/test_fees.py similarity index 100% rename from tests/e2e/test_fees.py rename to tests/legacy/test_fees.py diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 From 2eecf88b40ba76a83b9ff157528076aa27b60668 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 15:48:29 +0200 Subject: [PATCH 21/39] catch exception when writing large price to database --- src/helpers/database.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/helpers/database.py b/src/helpers/database.py index e7aa608..c10bf48 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -1,6 +1,7 @@ from datetime import datetime from hexbytes import HexBytes +import psycopg from sqlalchemy import text, insert, Table, Column, Integer, LargeBinary, MetaData from sqlalchemy.engine import Engine @@ -169,15 +170,21 @@ def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: "VALUES (:token_address, :time, :price, :source);" ) for token_address, time, price, source in prices: - self.execute_and_commit( - query, - { - "token_address": bytes.fromhex(token_address[2:]), - "time": datetime.fromtimestamp(time), - "price": price, - "source": source, - }, - ) + try: + self.execute_and_commit( + query, + { + "token_address": bytes.fromhex(token_address[2:]), + "time": datetime.fromtimestamp(time), + "price": price, + "source": source, + }, + ) + except psycopg.errors.NumericValueOutOfRange: + logger.warning( + f"Error while writing price data. token: {token_address}, " + f"time: {time}, price: {price}, source: {source}" + ) def get_latest_transaction(self) -> str | None: """Get latest transaction hash. From 02979846f3fd7327dd4e40c239281aff40b9675f Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 16:50:23 +0200 Subject: [PATCH 22/39] use all available price feeds --- src/price_providers/price_feed.py | 14 +++++++++----- src/transaction_processor.py | 9 +++++---- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/price_providers/price_feed.py b/src/price_providers/price_feed.py index 412af3a..c8b96ec 100644 --- a/src/price_providers/price_feed.py +++ b/src/price_providers/price_feed.py @@ -8,10 +8,14 @@ NATIVE_TOKEN = HexStr("0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE") +# pylint: disable=logging-fstring-interpolation + class PriceFeed: """Class encapsulating the different price providers.""" + # pylint: disable=too-few-public-methods + def __init__(self, activate: bool): if activate: self.providers = [ @@ -22,17 +26,17 @@ def __init__(self, activate: bool): else: self.providers = [] - def get_price(self, price_params: dict) -> tuple[float, str] | None: + def get_price(self, price_params: dict) -> list[tuple[float, str]]: """Function iterates over list of price provider objects and attempts to get a price.""" + prices: list[tuple[float, str]] = [] for provider in self.providers: try: - price: float | None = None if HexStr(price_params["token_address"]) == NATIVE_TOKEN: - price = 1.0 + price: float | None = 1.0 else: price = provider.get_price(price_params) if price is not None: - return price, provider.name + prices.append((price, provider.name)) except Exception as e: logger.error(f"Error getting price from provider {provider.name}: {e}") - return None + return prices diff --git a/src/transaction_processor.py b/src/transaction_processor.py index d85b01f..43893c3 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -258,9 +258,10 @@ def get_prices_for_tokens( set_params(token_address, block_number, tx_hash) ) if price_data: - prices.append( - (token_address, timestamp, price_data[0], price_data[1]) - ) + prices += [ + (token_address, timestamp, price, source) + for price, source in price_data + ] else: logger.warning( f"Failed to fetch price for token {token_address} and" @@ -285,7 +286,7 @@ def process_prices_for_tokens( set_params(token_address, block_number, tx_hash) ) if price_data: - price, source = price_data + price, source = price_data[0] prices[token_address] = (price, source) except Exception as e: logger.error(f"Failed to process prices for transaction {tx_hash}: {e}") From 7ed41a581c4fc4569b052895ffda3f0f1e3776d3 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 16:51:49 +0200 Subject: [PATCH 23/39] add additional preliminary e2e test --- tests/e2e/test_transaction_processor.py | 31 +++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 tests/e2e/test_transaction_processor.py diff --git a/tests/e2e/test_transaction_processor.py b/tests/e2e/test_transaction_processor.py new file mode 100644 index 0000000..8c602ef --- /dev/null +++ b/tests/e2e/test_transaction_processor.py @@ -0,0 +1,31 @@ +from os import getenv, environ +from unittest.mock import Mock + +from src.helpers.config import initialize_connections +from src.transaction_processor import TransactionProcessor +from src.helpers.database import Database +from src.helpers.blockchain_data import BlockchainData + + +def tests_process_single_transaction(): + chain_name = "mainnet" + web3, db_engine = initialize_connections() + blockchain = BlockchainData(web3) + db = Database(db_engine, chain_name) + + process_imbalances = True + process_fees = False + process_prices = True + + processor = TransactionProcessor( + blockchain, db, chain_name, process_imbalances, process_fees, process_prices + ) + + # delete data + + # process hash + processor.process_single_transaction( + "0x68e7183363be7460642e78ab09a2898c8aeac6657c2434f7b318f54590c46299", + 9481594, + 20935017, + ) From 155612d6b2997a832dc134fd72b5077556fa86f4 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 18:15:35 +0200 Subject: [PATCH 24/39] add test for writing prices to database --- tests/unit/test_database.py | 62 +++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 392bb2c..95af032 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -1,26 +1,12 @@ -from os import getenv, environ -from unittest.mock import patch +from datetime import datetime from hexbytes import HexBytes -import pytest from sqlalchemy import create_engine, text +from src.helpers.database import Database -@pytest.fixture() -def set_env_variables(monkeypatch): - with patch.dict(environ, clear=True): - envvars = { - "CHAIN_SLEEP_TIME": "1", - } - for k, v in envvars.items(): - monkeypatch.setenv(k, v) - yield # This is the magical bit which restore the environment after - - -def tests_write_transaction_timestamp(set_env_variables): - # import has to happen after patching environment variable - from src.helpers.database import Database +def tests_write_transaction_timestamp(): engine = create_engine( f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" ) @@ -48,9 +34,8 @@ def tests_write_transaction_timestamp(set_env_variables): assert res[1].timestamp() == 1728044411 -def tests_write_transaction_tokens(set_env_variables): +def tests_write_transaction_tokens(): # import has to happen after patching environment variable - from src.helpers.database import Database engine = create_engine( f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" @@ -82,7 +67,44 @@ def tests_write_transaction_tokens(set_env_variables): assert HexBytes(res[i][1]) == HexBytes(token_address) -def test_get_latest_transaction(set_env_variables): +def tests_write_prices(): + engine = create_engine( + f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" + ) + db = Database(engine, "mainnet") + token_prices = [ + ( + "0xA0B86991C6218B36C1D19D4A2E9EB0CE3606EB48", + int(datetime.fromisoformat("2024-10-10 16:48:47.000000").timestamp()), + 0.000420454193230350, + "coingecko", + ), + ( + "0x68BBED6A47194EFF1CF514B50EA91895597FC91E", + int(datetime.fromisoformat("2024-10-10 16:49:47.000000").timestamp()), + 0.000000050569218629, + "moralis", + ), + ] + # truncate table + with engine.connect() as conn: + conn.execute(text("TRUNCATE prices")) + conn.commit() + # write data + db.write_prices_new(token_prices) + # read data + with engine.connect() as conn: + res = conn.execute( + text("SELECT token_address, time, price, source FROM prices") + ).all() + for i, (token_address, time, price, source) in enumerate(token_prices): + assert HexBytes(res[i][0]) == HexBytes(token_address) + assert res[i][1].timestamp() == time + assert float(res[i][2]) == price + assert res[i][3] == source + + +def test_get_latest_transaction(): # import has to happen after patching environment variable from src.helpers.database import Database From 52392c84b74c16c3bbf4c7043ca326a18ab0f390 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 18:17:42 +0200 Subject: [PATCH 25/39] fix docstrings --- src/helpers/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/helpers/database.py b/src/helpers/database.py index c10bf48..868a38b 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -133,7 +133,7 @@ def write_fees( def write_transaction_timestamp( self, transaction_timestamp: tuple[str, int] ) -> None: - """Function writes the transaction timestamp to the table.""" + """Writes the transaction timestamp to database.""" query = ( "INSERT INTO transaction_timestamps (tx_hash, time) " "VALUES (:tx_hash, :time);" @@ -149,7 +149,7 @@ def write_transaction_timestamp( def write_transaction_tokens( self, transaction_tokens: list[tuple[str, str]] ) -> None: - """Function writes the transaction timestamp to the table.""" + """Writes the transaction tokens to the database.""" query = ( "INSERT INTO transaction_tokens (tx_hash, token_address) " "VALUES (:tx_hash, :token_address);" From d76b4b9ee6b5801d9ee5a31b55ac1b0a7c30bde9 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 18:19:45 +0200 Subject: [PATCH 26/39] fix tests --- src/token_decimals.py | 2 +- tests/unit/test_database.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/token_decimals.py b/src/token_decimals.py index 3675a74..a5ef1db 100644 --- a/src/token_decimals.py +++ b/src/token_decimals.py @@ -25,7 +25,7 @@ def update_token_decimals(engine, web3) -> None: if __name__ == "__main__": - engine = create_engine(f"postgresql+psycopg2://{getenv('SOLVER_SLIPPAGE_DB_URL')}") + engine = create_engine(f"postgresql+psycopg://{getenv('SOLVER_SLIPPAGE_DB_URL')}") web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) update_token_decimals(engine, web3) diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 95af032..a6c54a4 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -8,7 +8,7 @@ def tests_write_transaction_timestamp(): engine = create_engine( - f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" + f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" ) db = Database(engine, "mainnet") # truncate table @@ -38,7 +38,7 @@ def tests_write_transaction_tokens(): # import has to happen after patching environment variable engine = create_engine( - f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" + f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" ) db = Database(engine, "mainnet") transaction_tokens = [ @@ -69,7 +69,7 @@ def tests_write_transaction_tokens(): def tests_write_prices(): engine = create_engine( - f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" + f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" ) db = Database(engine, "mainnet") token_prices = [ @@ -109,7 +109,7 @@ def test_get_latest_transaction(): from src.helpers.database import Database engine = create_engine( - f"postgresql+psycopg2://postgres:postgres@localhost:5432/mainnet" + f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" ) db = Database(engine, "mainnet") # truncate table From 50e2352a2b0354228a7e5d22f10a5043e0b02b79 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 18:37:38 +0200 Subject: [PATCH 27/39] rename timestamp table --- database/01_table_creation.sql | 2 +- src/helpers/database.py | 4 ++-- tests/unit/test_database.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/database/01_table_creation.sql b/database/01_table_creation.sql index 0bb9476..6d43302 100644 --- a/database/01_table_creation.sql +++ b/database/01_table_creation.sql @@ -1,4 +1,4 @@ -CREATE TABLE transaction_timestamps ( +CREATE TABLE transaction_timestamp ( tx_hash bytea PRIMARY KEY, time timestamp NOT NULL ); diff --git a/src/helpers/database.py b/src/helpers/database.py index 868a38b..a97c6b8 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -135,7 +135,7 @@ def write_transaction_timestamp( ) -> None: """Writes the transaction timestamp to database.""" query = ( - "INSERT INTO transaction_timestamps (tx_hash, time) " + "INSERT INTO transaction_timestamp (tx_hash, time) " "VALUES (:tx_hash, :time);" ) self.execute_and_commit( @@ -189,7 +189,7 @@ def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: def get_latest_transaction(self) -> str | None: """Get latest transaction hash. If no transaction is found, return None.""" - query = "SELECT tx_hash FROM transaction_timestamps ORDER BY time DESC LIMIT 1;" + query = "SELECT tx_hash FROM transaction_timestamp ORDER BY time DESC LIMIT 1;" result = self.execute_query(query, {}).fetchone() if result is None: diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index a6c54a4..90b3ff0 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -13,7 +13,7 @@ def tests_write_transaction_timestamp(): db = Database(engine, "mainnet") # truncate table with engine.connect() as conn: - conn.execute(text("TRUNCATE transaction_timestamps")) + conn.execute(text("TRUNCATE transaction_timestamp")) conn.commit() # write data db.write_transaction_timestamp( @@ -25,7 +25,7 @@ def tests_write_transaction_timestamp(): # read data with engine.connect() as conn: res = conn.execute( - text("SELECT tx_hash, time FROM transaction_timestamps") + text("SELECT tx_hash, time FROM transaction_timestamp") ).one() assert ( "0x" + bytes(res[0]).hex() @@ -114,7 +114,7 @@ def test_get_latest_transaction(): db = Database(engine, "mainnet") # truncate table with engine.connect() as conn: - conn.execute(text("TRUNCATE transaction_timestamps")) + conn.execute(text("TRUNCATE transaction_timestamp")) conn.commit() # check that empty table returns None assert db.get_latest_transaction() is None From d1085972f69e4dd34977d36a49be30d5b73e0767 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 18:38:04 +0200 Subject: [PATCH 28/39] remove pandas dependency --- requirements.in | 2 -- requirements.txt | 17 +---------------- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/requirements.in b/requirements.in index 42f0597..c95bacb 100644 --- a/requirements.in +++ b/requirements.in @@ -1,7 +1,5 @@ dune-client moralis -pandas -pandas-stubs psycopg python-dotenv requests diff --git a/requirements.txt b/requirements.txt index ccd95c2..b844005 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with Python 3.12 # by the following command: # -# pip-compile +# pip-compile requirements.in # aiohappyeyeballs==2.4.0 # via aiohttp @@ -116,19 +116,11 @@ mypy-extensions==1.0.0 # typing-inspect ndjson==0.3.1 # via dune-client -numpy==1.26.4 - # via - # pandas - # pandas-stubs packaging==24.1 # via # black # marshmallow # pytest -pandas==2.2.1 - # via -r requirements.in -pandas-stubs==2.2.2.240909 - # via -r requirements.in parsimonious==0.10.0 # via eth-abi pathspec==0.12.1 @@ -159,11 +151,8 @@ python-dateutil==2.9.0.post0 # via # dune-client # moralis - # pandas python-dotenv==1.0.0 # via -r requirements.in -pytz==2024.2 - # via pandas pyunormalize==16.0.0 # via web3 regex==2024.9.11 @@ -189,8 +178,6 @@ types-deprecated==1.2.9.20240311 # via dune-client types-python-dateutil==2.9.0.20240906 # via dune-client -types-pytz==2024.2.0.20240913 - # via pandas-stubs types-pyyaml==6.0.12.20240917 # via dune-client types-requests==2.32.0.20240914 @@ -213,8 +200,6 @@ typing-extensions==4.12.2 # web3 typing-inspect==0.9.0 # via dataclasses-json -tzdata==2024.1 - # via pandas urllib3==2.2.3 # via # moralis From af19c55c51de8b4057c21e090eb7032640c46653 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 19:26:40 +0200 Subject: [PATCH 29/39] change database setup --- .github/workflows/pull_request.yaml | 3 ++- Dockerfile.test_db | 1 - Makefile | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index 45fcf82..317ae22 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -22,7 +22,8 @@ jobs: - 5432:5432 # Copy repo contents into container (needed to populate DB) volumes: - - ${{ github.workspace }}/database:/docker-entrypoint-initdb.d + - ${{ github.workspace }}/database/00_legacy_tables.sql:/docker-entrypoint-initdb.d/00_legacy_tables.sql + - ${{ github.workspace }}/database/01_table_creation.sql:/docker-entrypoint-initdb.d/01_table_creation.sql steps: - uses: actions/checkout@v3 - name: Setup Python 3.12 diff --git a/Dockerfile.test_db b/Dockerfile.test_db index b8b1064..a143cef 100644 --- a/Dockerfile.test_db +++ b/Dockerfile.test_db @@ -1,4 +1,3 @@ FROM postgres ENV POSTGRES_PASSWORD=postgres ENV POSTGRES_DB=mainnet -COPY ./database/* /docker-entrypoint-initdb.d/ diff --git a/Makefile b/Makefile index 0880e0e..0f8fb8d 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ daemon: test_db: docker build -t $(DOCKER_IMAGE_NAME) -f Dockerfile.test_db . - docker run -d --name $(DOCKER_CONTAINER_NAME) -p $(DB_PORT):$(DB_PORT) $(DOCKER_IMAGE_NAME) + docker run -d --name $(DOCKER_CONTAINER_NAME) -p $(DB_PORT):$(DB_PORT) -v ${PWD}/database/00_legacy_tables.sql:/docker-entrypoint-initdb.d/00_legacy_tables.sql -v ${PWD}/database/01_table_creation.sql:/docker-entrypoint-initdb.d/01_table_creation.sql $(DOCKER_IMAGE_NAME) stop_test_db: docker stop $(DOCKER_CONTAINER_NAME) || true From 943813edf426aa5eed23920142299d7854670335 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 19:38:46 +0200 Subject: [PATCH 30/39] debugging postgres setup --- .github/workflows/pull_request.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index 317ae22..67adff9 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -22,7 +22,7 @@ jobs: - 5432:5432 # Copy repo contents into container (needed to populate DB) volumes: - - ${{ github.workspace }}/database/00_legacy_tables.sql:/docker-entrypoint-initdb.d/00_legacy_tables.sql +# - ${{ github.workspace }}/database/00_legacy_tables.sql:/docker-entrypoint-initdb.d/00_legacy_tables.sql - ${{ github.workspace }}/database/01_table_creation.sql:/docker-entrypoint-initdb.d/01_table_creation.sql steps: - uses: actions/checkout@v3 From 83d00b97ef5778d7225b35f513a175911f4cc4f5 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Thu, 10 Oct 2024 19:53:48 +0200 Subject: [PATCH 31/39] next try for db setup --- .github/workflows/pull_request.yaml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index 67adff9..c6a5a51 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -20,12 +20,13 @@ jobs: --health-retries 5 ports: - 5432:5432 - # Copy repo contents into container (needed to populate DB) - volumes: -# - ${{ github.workspace }}/database/00_legacy_tables.sql:/docker-entrypoint-initdb.d/00_legacy_tables.sql - - ${{ github.workspace }}/database/01_table_creation.sql:/docker-entrypoint-initdb.d/01_table_creation.sql steps: - uses: actions/checkout@v3 + - name: Initialize database + run: | + psql -h localhost -U postgres -d mainnet -f ${{ github.workspace }}/database/01_table_creation.sql + env: + PGPASSWORD: postgres - name: Setup Python 3.12 uses: actions/setup-python@v3 with: From 3f493a23fabe9a37e8fdcd0fa6d24256077e65ad Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Fri, 11 Oct 2024 11:39:59 +0200 Subject: [PATCH 32/39] make coingecko and moralis api keys optional --- src/price_providers/coingecko_pricing.py | 16 ++++++++++------ src/price_providers/moralis_pricing.py | 3 +++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/price_providers/coingecko_pricing.py b/src/price_providers/coingecko_pricing.py index 9933e69..069e209 100644 --- a/src/price_providers/coingecko_pricing.py +++ b/src/price_providers/coingecko_pricing.py @@ -1,8 +1,9 @@ import os import time + import requests -import json from web3 import Web3 + from src.price_providers.pricing_model import AbstractPriceProvider from src.helpers.config import logger, get_web3_instance from src.helpers.helper_functions import get_finalized_block_number, extract_params @@ -31,11 +32,14 @@ def __init__(self) -> None: def name(self) -> str: return "coingecko" - def fetch_coingecko_list(self) -> list[dict]: + def fetch_coingecko_list(self) -> list[dict] | None: """ Fetch and filter the list of tokens (currently filters only Ethereum) from the Coingecko API. """ + if not coingecko_api_key: + logger.warning("Coingecko API key is not set.") + return None url = ( f"https://pro-api.coingecko.com/api/v3/coins/" f"list?include_platform=true&status=active" @@ -47,7 +51,7 @@ def fetch_coingecko_list(self) -> list[dict]: headers["x-cg-pro-api-key"] = coingecko_api_key response = requests.get(url, headers=headers) - tokens_list = json.loads(response.text) + tokens_list = response.json() return [ {"id": item["id"], "platforms": {"ethereum": item["platforms"]["ethereum"]}} for item in tokens_list @@ -82,9 +86,6 @@ def fetch_api_price( """ Makes call to Coingecko API to fetch price, between a start and end timestamp. """ - if not coingecko_api_key: - logger.warning("Coingecko API key is not set.") - return None # price of token is returned in ETH url = ( f"https://pro-api.coingecko.com/api/v3/coins/{token_id}/market_chart/range" @@ -122,6 +123,9 @@ def get_price(self, price_params: dict) -> float | None: Function returns coingecko price for a token address, closest to and at least as large as the block timestamp for a given tx hash. """ + if not coingecko_api_key: + logger.warning("Coingecko API key is not set.") + return None token_address, block_number = extract_params(price_params, is_block=True) block_start_timestamp = self.web3.eth.get_block(block_number)["timestamp"] if self.price_not_retrievable(block_start_timestamp): diff --git a/src/price_providers/moralis_pricing.py b/src/price_providers/moralis_pricing.py index bc4bb57..affc793 100644 --- a/src/price_providers/moralis_pricing.py +++ b/src/price_providers/moralis_pricing.py @@ -37,6 +37,9 @@ def get_price(self, price_params: dict) -> float | None: Price returned is closest to and at least as large as block timestamp. """ try: + if os.getenv("MORALIS_API_KEY") is None: + self.logger.warning("Moralis API key is not set.") + return None token_address, block_number = extract_params(price_params, is_block=True) params = { "chain": "eth", From 44aa0637dce05d82babbe58f7c95faf722756411 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Fri, 11 Oct 2024 11:40:35 +0200 Subject: [PATCH 33/39] simplify token decimals writing setup --- src/helpers/database.py | 12 +++--------- tests/unit/test_database.py | 2 +- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/helpers/database.py b/src/helpers/database.py index a97c6b8..a3c4bd1 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -212,22 +212,16 @@ def write_token_decimals(self, token_decimals: list[tuple[str, int]]) -> None: # Define the table without creating a model class token_decimals_table = Table( - "token_decimals", - MetaData(), - Column("token_address", LargeBinary, primary_key=True), - Column("decimals", Integer, nullable=False), + "token_decimals", MetaData(), autoload_with=self.engine ) # Prepare the data - values = [ + records = [ {"token_address": bytes.fromhex(token_address[2:]), "decimals": decimals} for token_address, decimals in token_decimals ] - # Create the insert statement - stmt = insert(token_decimals_table).values(values) - # Execute the bulk insert with self.engine.connect() as conn: - conn.execute(stmt) + conn.execute(token_decimals_table.insert(), records) conn.commit() diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 90b3ff0..18d1302 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -8,7 +8,7 @@ def tests_write_transaction_timestamp(): engine = create_engine( - f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" + "postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" ) db = Database(engine, "mainnet") # truncate table From 12e49a8933dc3d40c735d34a51d1426eca2c71bb Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Fri, 11 Oct 2024 11:50:56 +0200 Subject: [PATCH 34/39] handle missing coingecko keys in one more place --- src/price_providers/coingecko_pricing.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/price_providers/coingecko_pricing.py b/src/price_providers/coingecko_pricing.py index 069e209..f2e0b48 100644 --- a/src/price_providers/coingecko_pricing.py +++ b/src/price_providers/coingecko_pricing.py @@ -75,9 +75,12 @@ def get_token_id_by_address(self, token_address: str) -> str | None: self.last_reload_time = ( time.time() ) # update the last reload time to current time - for token in self.filtered_token_list: - if token["platforms"].get("ethereum") == token_address: - return token["id"] + if ( + self.filtered_token_list is not None + ): # TODO: handle missing keys more systematically + for token in self.filtered_token_list: + if token["platforms"].get("ethereum") == token_address: + return token["id"] return None def fetch_api_price( From 119658e47ce0fc5942c6fe8379fbe51e80526aa9 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Fri, 11 Oct 2024 11:51:52 +0200 Subject: [PATCH 35/39] move all blockchain fetching into blockchain type --- src/helpers/blockchain_data.py | 45 +++++++++---------- .../endpoint_auction_pricing.py | 6 +-- src/token_decimals.py | 16 +++---- src/transaction_processor.py | 14 ++---- tests/e2e/test_blockchain_data.py | 12 +++-- 5 files changed, 41 insertions(+), 52 deletions(-) diff --git a/src/helpers/blockchain_data.py b/src/helpers/blockchain_data.py index 78ce94e..072e9c4 100644 --- a/src/helpers/blockchain_data.py +++ b/src/helpers/blockchain_data.py @@ -75,33 +75,30 @@ def get_auction_id(self, tx_hash: str) -> int: auction_id = int.from_bytes(call_data_bytes[-8:], byteorder="big") return auction_id + def get_transaction_timestamp(self, tx_hash: str) -> tuple[str, int]: + receipt = self.web3.eth.get_transaction_receipt(HexStr(tx_hash)) + block_number = receipt["blockNumber"] + block = self.web3.eth.get_block(block_number) + timestamp = block["timestamp"] -def get_transaction_timestamp(tx_hash: str, web3: Web3) -> tuple[str, int]: - receipt = web3.eth.get_transaction_receipt(HexStr(tx_hash)) - block_number = receipt["blockNumber"] - block = web3.eth.get_block(block_number) - timestamp = block["timestamp"] + return tx_hash, timestamp - return tx_hash, timestamp + def get_transaction_tokens(self, tx_hash: str) -> list[tuple[str, str]]: + receipt = self.web3.eth.get_transaction_receipt(HexStr(tx_hash)) + transfer_topic = self.web3.keccak(text="Transfer(address,address,uint256)") -def get_transaction_tokens(tx_hash: str, web3: Web3) -> list[tuple[str, str]]: - receipt = web3.eth.get_transaction_receipt(HexStr(tx_hash)) + token_addresses: set[str] = set() + for log in receipt["logs"]: + if log["topics"] and log["topics"][0] == transfer_topic: + token_address = log["address"] + token_addresses.add(token_address) - transfer_topic = web3.keccak(text="Transfer(address,address,uint256)") + return [(tx_hash, token_address) for token_address in token_addresses] - token_addresses: set[str] = set() - for log in receipt["logs"]: - if log["topics"] and log["topics"][0] == transfer_topic: - token_address = log["address"] - token_addresses.add(token_address) - - return [(tx_hash, token_address) for token_address in token_addresses] - - -def get_token_decimals(token_address: str, web3: Web3) -> int: - """Get number of decimals for a token.""" - contract = web3.eth.contract( - address=Web3.to_checksum_address(token_address), abi=erc20_abi - ) - return contract.functions.decimals().call() + def get_token_decimals(self, token_address: str) -> int: + """Get number of decimals for a token.""" + contract = self.web3.eth.contract( + address=Web3.to_checksum_address(token_address), abi=erc20_abi + ) + return contract.functions.decimals().call() diff --git a/src/price_providers/endpoint_auction_pricing.py b/src/price_providers/endpoint_auction_pricing.py index f84334e..ff4012a 100644 --- a/src/price_providers/endpoint_auction_pricing.py +++ b/src/price_providers/endpoint_auction_pricing.py @@ -1,7 +1,7 @@ import requests from src.price_providers.pricing_model import AbstractPriceProvider -from src.helpers.blockchain_data import get_token_decimals +from src.helpers.blockchain_data import BlockchainData from src.helpers.config import get_web3_instance, logger from src.helpers.helper_functions import extract_params @@ -10,7 +10,7 @@ class AuctionPriceProvider(AbstractPriceProvider): """Fetch auction prices.""" def __init__(self) -> None: - self.web3 = get_web3_instance() + self.blockchain = BlockchainData(get_web3_instance()) self.endpoint_urls = { "prod": f"https://api.cow.fi/mainnet/api/v1/solver_competition/by_tx_hash/", "barn": f"https://barn.api.cow.fi/mainnet/api/v1/solver_competition/by_tx_hash/", @@ -42,7 +42,7 @@ def get_price(self, price_params: dict) -> float | None: return None # calculation for converting auction price from endpoint to ETH equivalent per token unit price_in_eth = (float(price) / 10**18) * ( - 10 ** get_token_decimals(token_address, self.web3) / 10**18 + 10 ** self.blockchain.get_token_decimals(token_address) / 10**18 ) return price_in_eth diff --git a/src/token_decimals.py b/src/token_decimals.py index a5ef1db..70b9e29 100644 --- a/src/token_decimals.py +++ b/src/token_decimals.py @@ -4,28 +4,28 @@ from sqlalchemy import create_engine from web3 import Web3 -from src.helpers.blockchain_data import get_token_decimals +from src.helpers.blockchain_data import BlockchainData from src.helpers.database import Database load_dotenv() -def update_token_decimals(engine, web3) -> None: - db = Database(engine, "mainnet") - - token_addresses = db.get_tokens_without_decimals() +def update_token_decimals(database: Database, blockchain: BlockchainData) -> None: + token_addresses = database.get_tokens_without_decimals() token_decimals = [ - (token_address, get_token_decimals(token_address, web3)) + (token_address, blockchain.get_token_decimals(token_address)) for token_address in token_addresses ] if token_decimals: - db.write_token_decimals(token_decimals) + database.write_token_decimals(token_decimals) if __name__ == "__main__": engine = create_engine(f"postgresql+psycopg://{getenv('SOLVER_SLIPPAGE_DB_URL')}") + database = Database(engine, "mainnet") web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + blockchain_data = BlockchainData(web3) - update_token_decimals(engine, web3) + update_token_decimals(database, blockchain_data) diff --git a/src/transaction_processor.py b/src/transaction_processor.py index 43893c3..dd64ef0 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -4,11 +4,7 @@ from web3 import Web3 from src.fees.compute_fees import compute_all_fees_of_batch -from src.helpers.blockchain_data import ( - BlockchainData, - get_transaction_timestamp, - get_transaction_tokens, -) +from src.helpers.blockchain_data import BlockchainData from src.helpers.config import CHAIN_SLEEP_TIME, logger from src.helpers.database import Database from src.helpers.helper_functions import read_sql_file, set_params @@ -131,16 +127,14 @@ def process_single_transaction( self.log_message = [] try: # get transaction timestamp - transaction_timestamp = get_transaction_timestamp( - tx_hash, self.blockchain_data.web3 + transaction_timestamp = self.blockchain_data.get_transaction_timestamp( + tx_hash ) # store transaction timestamp self.db.write_transaction_timestamp(transaction_timestamp) # get transaction tokens - transaction_tokens = get_transaction_tokens( - tx_hash, self.blockchain_data.web3 - ) + transaction_tokens = self.blockchain_data.get_transaction_tokens(tx_hash) # store transaction tokens self.db.write_transaction_tokens(transaction_tokens) diff --git a/tests/e2e/test_blockchain_data.py b/tests/e2e/test_blockchain_data.py index 49e18bf..370b33d 100644 --- a/tests/e2e/test_blockchain_data.py +++ b/tests/e2e/test_blockchain_data.py @@ -2,11 +2,7 @@ from web3 import Web3 -from src.helpers.blockchain_data import ( - BlockchainData, - get_transaction_timestamp, - get_transaction_tokens, -) +from src.helpers.blockchain_data import BlockchainData def tests_get_tx_hashes_blocks(): @@ -23,18 +19,20 @@ def tests_get_tx_hashes_blocks(): def test_get_transaction_timestamp(): web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + blockchain = BlockchainData(web3) tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" - transaction_timestamp = get_transaction_timestamp(tx_hash, web3) + transaction_timestamp = blockchain.get_transaction_timestamp(tx_hash) assert transaction_timestamp == (tx_hash, 1728044411) def test_get_transaction_tokens(): web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + blockchain = BlockchainData(web3) tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" - transaction_tokens = get_transaction_tokens(tx_hash, web3) + transaction_tokens = blockchain.get_transaction_tokens(tx_hash) assert all(h == tx_hash for h, _ in transaction_tokens) assert set(token_address for _, token_address in transaction_tokens) == { From 68513eda31967c78943ebae9037d4229d04832a7 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Fri, 11 Oct 2024 12:09:47 +0200 Subject: [PATCH 36/39] change database setup in CI --- .github/workflows/pull_request.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index c6a5a51..35af45f 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -24,7 +24,9 @@ jobs: - uses: actions/checkout@v3 - name: Initialize database run: | - psql -h localhost -U postgres -d mainnet -f ${{ github.workspace }}/database/01_table_creation.sql + for file in ${{ github.workspace }}/database/*.sql; do + psql -h localhost -U postgres -d mainnet -f "$file" + done env: PGPASSWORD: postgres - name: Setup Python 3.12 From aeb7652295e2a70c61cdc23b8ef6b477f64f4d4e Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Fri, 11 Oct 2024 12:22:56 +0200 Subject: [PATCH 37/39] fix bug in getting token balances --- src/transaction_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transaction_processor.py b/src/transaction_processor.py index dd64ef0..1f31c02 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -139,7 +139,7 @@ def process_single_transaction( self.db.write_transaction_tokens(transaction_tokens) # update token decimals - update_token_decimals(self.db.engine, self.blockchain_data.web3) + update_token_decimals(self.db, self.blockchain_data) # get prices prices_new = self.get_prices_for_tokens( From 458b1894584cd2e7c752f6134271358cf7693ac2 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Fri, 11 Oct 2024 12:51:51 +0200 Subject: [PATCH 38/39] add back restart logic for old data --- src/transaction_processor.py | 46 +++++++++++++++--------------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/src/transaction_processor.py b/src/transaction_processor.py index 1f31c02..71e4312 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -50,45 +50,37 @@ def get_start_block(self) -> int: If no entries are present, fallback to get_finalized_block_number(). """ try: - # 1) get last transaction from DB + # 1) get latest block on chain + block_number_latest = self.blockchain_data.get_latest_block() + # 2) get last transaction from DB latest_tx_hash = self.db.get_latest_transaction() - # 2) get block of that transaction + # 3) get block of that transaction if latest_tx_hash: - block_number = int( + block_number_db = int( self.blockchain_data.web3.eth.get_transaction_receipt( HexBytes(latest_tx_hash) )["blockNumber"] ) + if block_number_db < block_number_latest - 7200: + # TODO: Remove this rule before moving to production. + logger.warning( + "Only old transactions found in database, latest was on block" + f"{block_number_db}. Using recent block instead." + ) + start_block = block_number_latest + else: + start_block = block_number_db + 1 else: logger.warning( - f"No transaction found in database. Using recent block instead." + "No transaction found in database. Using recent block instead." ) - block_number = self.blockchain_data.get_latest_block() - return block_number + 1 - - # # Query for the maximum block number - # query_max_block = read_sql_file("src/sql/select_max_block.sql") - # result = self.db.execute_query( - # query_max_block, {"chain_name": self.chain_name} - # ) - # row = result.fetchone() - # max_block = row[0] if row is not None else None - # blockchain_latest_block = self.blockchain_data.get_latest_block() - # - # # If no entries present, fallback to get_latest_block() - # if max_block is None: - # return blockchain_latest_block - # - # logger.info("Fetched max block number from database: %d", max_block) - # if max_block > blockchain_latest_block - 7200: - # return max_block + 1 - # else: - # # TODO: Remove this rule before moving to production. - # return blockchain_latest_block + start_block = block_number_latest except Exception as e: - logger.error("Error fetching start block from database: %s", e) + logger.error(f"Error fetching start block: {e}") raise + return start_block + def process(self, start_block: int) -> None: """Main Daemon loop that finds imbalances for txs and prices.""" previous_block = start_block From 22b432354d5ae61e21dec53cd6360ff59d060b6e Mon Sep 17 00:00:00 2001 From: harisang Date: Mon, 14 Oct 2024 03:11:32 +0300 Subject: [PATCH 39/39] keep compute imbalances from old code --- src/transaction_processor.py | 83 ++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 37 deletions(-) diff --git a/src/transaction_processor.py b/src/transaction_processor.py index 71e4312..af42ace 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -118,6 +118,11 @@ def process_single_transaction( """Function processes a single tx to find imbalances, fees, prices including writing to database.""" self.log_message = [] try: + # compute raw token imbalances + token_imbalances = self.process_token_imbalances( + tx_hash, auction_id, block_number + ) + # get transaction timestamp transaction_timestamp = self.blockchain_data.get_transaction_timestamp( tx_hash @@ -126,8 +131,12 @@ def process_single_transaction( self.db.write_transaction_timestamp(transaction_timestamp) # get transaction tokens - transaction_tokens = self.blockchain_data.get_transaction_tokens(tx_hash) + # transaction_tokens = self.blockchain_data.get_transaction_tokens(tx_hash) # store transaction tokens + transaction_tokens = [] + for token_address, imbalance in token_imbalances.items(): + if imbalance != 0: + transaction_tokens.append((tx_hash, token_address)) self.db.write_transaction_tokens(transaction_tokens) # update token decimals @@ -141,50 +150,50 @@ def process_single_transaction( self.db.write_prices_new(prices_new) # Compute Raw Token Imbalances - if self.process_imbalances: - token_imbalances = self.process_token_imbalances( - tx_hash, auction_id, block_number - ) - - # Compute Fees - if self.process_fees: - ( - protocol_fees, - partner_fees, - network_fees, - ) = self.process_fees_for_transaction(tx_hash) - - # Compute Prices - if self.process_prices and self.process_imbalances: - prices = self.process_prices_for_tokens( - token_imbalances, block_number, tx_hash - ) + # if self.process_imbalances: + # token_imbalances = self.process_token_imbalances( + # tx_hash, auction_id, block_number + # ) + + # # Compute Fees + # if self.process_fees: + # ( + # protocol_fees, + # partner_fees, + # network_fees, + # ) = self.process_fees_for_transaction(tx_hash) + + # # Compute Prices + # if self.process_prices and self.process_imbalances: + # prices = self.process_prices_for_tokens( + # token_imbalances, block_number, tx_hash + # ) # Write to database iff no errors in either computations - if ( - (not self.process_imbalances) - and (not self.process_fees) - and (not self.process_prices) - ): - return + # if ( + # (not self.process_imbalances) + # and (not self.process_fees) + # and (not self.process_prices) + # ): + # return if self.process_imbalances and token_imbalances: self.handle_imbalances( token_imbalances, tx_hash, auction_id, block_number ) - if self.process_fees: - self.handle_fees( - protocol_fees, - partner_fees, - network_fees, - auction_id, - block_number, - tx_hash, - ) - - if self.process_prices and prices: - self.handle_prices(prices, tx_hash, block_number) + # if self.process_fees: + # self.handle_fees( + # protocol_fees, + # partner_fees, + # network_fees, + # auction_id, + # block_number, + # tx_hash, + # ) + + # if self.process_prices and prices: + # self.handle_prices(prices, tx_hash, block_number) logger.info("\n".join(self.log_message))