Skip to content

Commit

Permalink
pylint 1
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhagarwal03 committed Jul 4, 2024
1 parent 8b5abe0 commit 4fde33b
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 30 deletions.
3 changes: 3 additions & 0 deletions contracts/erc20_abi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
ERC20 ABI contract
"""
erc20_abi = [
{
"constant": True,
Expand Down
2 changes: 1 addition & 1 deletion src/balanceof_imbalances.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,4 @@ def main():


if __name__ == "__main__":
main()
main()
1 change: 1 addition & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
""" Constants used for the token imbalances project """
from web3 import Web3

SETTLEMENT_CONTRACT_ADDRESS = Web3.to_checksum_address(
Expand Down
39 changes: 27 additions & 12 deletions src/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
Running this daemon computes raw imbalances for finalized blocks by calling imbalances_script.py.
"""
import time
from typing import List, Tuple
from threading import Thread
import psycopg2
import pandas as pd
from web3 import Web3
from typing import List
from threading import Thread
from sqlalchemy.engine import Engine
from src.imbalances_script import RawTokenImbalances
from src.config import (
Expand All @@ -26,6 +26,9 @@ def write_token_imbalances_to_db(
token_address: str,
imbalance,
):
"""
Write token imbalances to the database.
"""
try:
cursor = write_db_connection.cursor()
# Remove '0x' and then convert hex strings to bytes
Expand All @@ -50,23 +53,29 @@ def write_token_imbalances_to_db(

logger.info("Record inserted successfully.")
except psycopg2.Error as e:
logger.error(f"Error inserting record: {e}")
logger.error("Error inserting record: %s", e)
finally:
cursor.close()


def get_web3_instance(chain_name: str) -> Web3:
"""
returns a Web3 instance for the given blockchain via chain name.
"""
return Web3(Web3.HTTPProvider(CHAIN_RPC_ENDPOINTS[chain_name]))


def get_finalized_block_number(web3: Web3) -> int:
"""
Get the number of the most recent finalized block.
"""
return web3.eth.block_number - 64


def fetch_transaction_hashes(
read_db_connection: Engine, start_block: int, end_block: int
) -> List[str]:
"""Fetch transaction hashes beginning start_block."""
"""Fetch transaction hashes beginning from start_block to end_block. """
query = f"""
SELECT tx_hash, auction_id
FROM settlements
Expand All @@ -86,6 +95,9 @@ def fetch_transaction_hashes(


def process_transactions(chain_name: str) -> None:
"""
Process transactions to compute imbalances for a given blockchain via chain name.
"""
web3 = get_web3_instance(chain_name)
rt = RawTokenImbalances(web3, chain_name)
sleep_time = CHAIN_SLEEP_TIMES.get(chain_name)
Expand All @@ -94,7 +106,7 @@ def process_transactions(chain_name: str) -> None:
previous_block = get_finalized_block_number(web3)
unprocessed_txs = []

logger.info(f"{chain_name} Daemon started.")
logger.info("%s Daemon started.", chain_name)

while True:
try:
Expand All @@ -107,10 +119,10 @@ def process_transactions(chain_name: str) -> None:
unprocessed_txs.clear()

for tx, auction_id in all_txs:
logger.info(f"Processing transaction on {chain_name}: {tx}")
logger.info("Processing transaction on %s: %s", chain_name, tx)
try:
imbalances = rt.compute_imbalances(tx)
logger.info(f"Token Imbalances on {chain_name}:")
logger.info("Token Imbalances on %s:", chain_name)
for token_address, imbalance in imbalances.items():
write_token_imbalances_to_db(
chain_name,
Expand All @@ -120,23 +132,26 @@ def process_transactions(chain_name: str) -> None:
token_address,
imbalance,
)
logger.info(f"Token: {token_address}, Imbalance: {imbalance}")
logger.info("Token: %s, Imbalance: %s", token_address, imbalance)
except ValueError as e:
logger.error(e)
logger.error("ValueError: %s", e)
unprocessed_txs.append(tx)

previous_block = latest_block + 1
except ConnectionError as e:
logger.error(
f"Connection error processing transactions on {chain_name}: {e}"
"Connection error processing transactions on %s: %s", chain_name, e
)
except Exception as e:
logger.error(f"Error processing transactions on {chain_name}: {e}")
logger.error("Error processing transactions on %s: %s", chain_name, e)

time.sleep(sleep_time)


def main() -> None:
"""
Main function to start the daemon threads for each blockchain.
"""
threads = []

for chain_name in CHAIN_RPC_ENDPOINTS.keys():
Expand All @@ -149,4 +164,4 @@ def main() -> None:


if __name__ == "__main__":
main()
main()
38 changes: 21 additions & 17 deletions src/imbalances_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
adding the transfer value to existing inflow/outflow for the token addresses.
7. Returning to calculate_imbalances(), which finds the imbalance for all token addresses using
inflow-outflow.
8. If actions are not None, it denotes an ETH transfer event, which involves reducing WETH withdrawal
amount- > update_weth_imbalance(). The ETH imbalance is also calculated via -> update_native_eth_imbalance().
9. update_sdai_imbalance() is called in each iteration and only completes if there is an SDAI transfer
involved which has special handling for its events.
8. If actions are not None, it denotes an ETH transfer event, which involves reducing WETH
withdrawal amount- > update_weth_imbalance(). The ETH imbalance is also calculated
via -> update_native_eth_imbalance().
9. update_sdai_imbalance() is called in each iteration and only completes if there is an SDAI
transfer involved which has special handling for its events.
"""
from typing import Dict, List, Optional, Tuple

from web3 import Web3
from web3.datastructures import AttributeDict
from typing import Dict, List, Optional, Tuple
from web3.types import TxReceipt

from src.config import CHAIN_RPC_ENDPOINTS, logger
from src.constants import (
SETTLEMENT_CONTRACT_ADDRESS,
Expand Down Expand Up @@ -54,17 +57,18 @@ def find_chain_with_tx(tx_hash: str) -> Tuple[str, Web3]:
for chain_name, url in CHAIN_RPC_ENDPOINTS.items():
web3 = Web3(Web3.HTTPProvider(url))
if not web3.is_connected():
logger.warning(f"Could not connect to {chain_name}.")
logger.warning("Could not connect to %s.", chain_name)
continue
try:
web3.eth.get_transaction_receipt(tx_hash)
logger.info(f"Transaction found on {chain_name}.")
logger.info("Transaction found on %s.", chain_name)
return chain_name, web3
except Exception as e:
logger.debug(f"Transaction not found on {chain_name}: {e}")
except Exception as ex:
logger.debug("Transaction not found on %s: %s", chain_name, ex)
raise ValueError(f"Transaction hash {tx_hash} not found on any chain.")



def _to_int(value: str | int) -> int:
"""Convert hex string or integer to integer."""
try:
Expand All @@ -74,11 +78,12 @@ def _to_int(value: str | int) -> int:
else int(value)
)
except ValueError:
logger.error(f"Error converting value {value} to integer.")

logger.error("Error converting value %s to integer.", value)


class RawTokenImbalances:
"""Class for computing token imbalances."""

def __init__(self, web3: Web3, chain_name: str):
self.web3 = web3
self.chain_name = chain_name
Expand All @@ -89,8 +94,8 @@ def get_transaction_receipt(self, tx_hash: str) -> Optional[TxReceipt]:
"""
try:
return self.web3.eth.get_transaction_receipt(tx_hash)
except Exception as e:
logger.error(f"Error getting transaction receipt: {e}")
except Exception as ex:
logger.error("Error getting transaction receipt: %s", ex)
return None

def get_transaction_trace(self, tx_hash: str) -> Optional[List[Dict]]:
Expand All @@ -99,7 +104,7 @@ def get_transaction_trace(self, tx_hash: str) -> Optional[List[Dict]]:
res = self.web3.tracing.trace_transaction(tx_hash)
return res
except Exception as err:
logger.error(f"Error occurred while fetching transaction trace: {err}")
logger.error("Error occurred while fetching transaction trace: %s", err)
return None

def extract_actions(self, traces: List[AttributeDict], address: str) -> List[Dict]:
Expand Down Expand Up @@ -188,7 +193,7 @@ def decode_event(
else: # Withdrawal event
return from_address, None, value
except Exception as e:
logger.error(f"Error decoding event: {str(e)}")
logger.error("Error decoding event: %s", str(e))
return None, None, None

def process_event(
Expand Down Expand Up @@ -318,8 +323,8 @@ def compute_imbalances(self, tx_hash: str) -> Dict[str, int]:
return imbalances


# main method for finding imbalance for a single tx hash
def main() -> None:
""" main function for finding imbalance for a single tx hash. """
tx_hash = input("Enter transaction hash: ")
chain_name, web3 = find_chain_with_tx(tx_hash)
rt = RawTokenImbalances(web3, chain_name)
Expand All @@ -332,6 +337,5 @@ def main() -> None:
logger.error(e)



if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions tests/basic_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
""" Runs a basic test for raw imbalance calculation edge-cases. """
import pytest
from src.imbalances_script import RawTokenImbalances

Expand Down Expand Up @@ -33,6 +34,9 @@
],
)
def test_imbalances(tx_hash, expected_imbalances):
"""
Asserts imbalances match for main script with test values provided.
"""
rt = RawTokenImbalances()
imbalances, _ = rt.compute_imbalances(tx_hash)
for token_address, expected_imbalance in expected_imbalances.items():
Expand Down

0 comments on commit 4fde33b

Please sign in to comment.