Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Backend DB Dependency #21

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ def check_db_connection(connection: Engine, db_type: str) -> Engine:
return connection


def initialize_connections() -> Tuple[Web3, Engine, Engine]:
def initialize_connections() -> Tuple[Web3, Engine]:
web3 = get_web3_instance()
solver_slippage_db_connection = create_db_connection("solver_slippage")
backend_db_connection = create_db_connection("backend")

return web3, solver_slippage_db_connection, backend_db_connection
return web3, solver_slippage_db_connection
43 changes: 20 additions & 23 deletions src/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
import os
import time
from typing import List, Tuple
import pandas as pd
from web3 import Web3
from sqlalchemy import text
from sqlalchemy.engine import Engine
from src.imbalances_script import RawTokenImbalances
from src.helper_functions import get_finalized_block_number, read_sql_file
from src.helper_functions import (
get_finalized_block_number,
get_tx_hashes_blocks,
get_auction_id,
read_sql_file,
)
from src.config import (
initialize_connections,
CHAIN_SLEEP_TIME,
Expand All @@ -20,7 +24,7 @@


def get_start_block(
chain_name: str, solver_slippage_connection: Engine, web3: Web3
web3: Web3, chain_name: str, solver_slippage_connection: Engine
) -> int:
"""
Retrieve the most recent block already present in raw_token_imbalances table,
Expand Down Expand Up @@ -72,29 +76,22 @@ def get_start_block(


def fetch_tx_data(
backend_db_connection: Engine, start_block: int, end_block: int
web3: Web3, start_block: int, end_block: int
) -> List[Tuple[str, int, int]]:
"""
Fetch transaction data beginning from start_block to end_block.
Returns (tx_hash, auction_id, block_number) as a tuple.
"""
tx_data: List[Tuple[str, int, int]] = []
tx_hashes_blocks = get_tx_hashes_blocks(web3, start_block, end_block)

backend_db_connection = check_db_connection(backend_db_connection, "backend")
query = read_sql_file("src/sql/select_transactions.sql")

db_data = pd.read_sql(
text(query),
backend_db_connection,
params={"start_block": start_block, "end_block": end_block},
)
# converts hashes at memory location to hex
db_data["tx_hash"] = db_data["tx_hash"].apply(lambda x: f"0x{x.hex()}")
for tx_hash, block_number in tx_hashes_blocks:
try:
auction_id = get_auction_id(web3, tx_hash)
tx_data.append((tx_hash, auction_id, block_number))
except Exception as e:
print(f"Error fetching auction ID for {tx_hash}: {e}")

# return (tx hash, auction id) as tx_data
tx_data = [
(row["tx_hash"], row["auction_id"], row["block_number"])
for index, row in db_data.iterrows()
]
return tx_data


Expand Down Expand Up @@ -133,7 +130,8 @@ def write_token_imbalances_to_db(
imbalance: float,
) -> None:
"""
Write token imbalances to the database if the (tx_hash, token_address) combination does not already exist.
Write token imbalances to the database if the (tx_hash, token_address) pair does not already exist.
This is done by first calling record_exists().
"""
solver_slippage_connection = check_db_connection(
solver_slippage_connection, "solver_slippage"
Expand Down Expand Up @@ -207,10 +205,9 @@ def process_transactions(chain_name: str) -> None:
(
web3,
solver_slippage_db_connection,
backend_db_connection,
) = initialize_connections()
rt = RawTokenImbalances(web3, chain_name)
start_block = get_start_block(chain_name, solver_slippage_db_connection, web3)
start_block = get_start_block(web3, chain_name, solver_slippage_db_connection)
previous_block = start_block
unprocessed_txs: List[Tuple[str, int, int]] = []

Expand All @@ -219,7 +216,7 @@ def process_transactions(chain_name: str) -> None:
while True:
try:
latest_block = get_finalized_block_number(web3)
new_txs = fetch_tx_data(backend_db_connection, previous_block, latest_block)
new_txs = fetch_tx_data(web3, previous_block, latest_block)
# Add any unprocessed txs for processing, then clear list of unprocessed
all_txs = new_txs + unprocessed_txs
unprocessed_txs.clear()
Expand Down
36 changes: 35 additions & 1 deletion src/helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import sys
import os
import logging
from typing import Optional
from typing import List, Optional, Tuple
from dotenv import load_dotenv
from hexbytes import HexBytes
from web3 import Web3
from src.constants import SETTLEMENT_CONTRACT_ADDRESS

load_dotenv()
NODE_URL = os.getenv("NODE_URL")
Expand Down Expand Up @@ -67,6 +69,38 @@ def get_finalized_block_number(web3: Web3) -> int:
return web3.eth.block_number - 67


def get_tx_hashes_blocks(
web3: Web3, start_block: int, end_block: int
) -> List[Tuple[str, int]]:
"""
Get all transaction hashes appended with corresponding block (tuple) transactions
involving the settlement contract.
"""
tx_hashes_blocks = []

for block_number in range(start_block, end_block + 1):
block = web3.eth.get_block(block_number, full_transactions=True)
for tx in block.transactions: # type: ignore[attr-defined]
if tx.to and tx.to.lower() == SETTLEMENT_CONTRACT_ADDRESS.lower():
tx_hashes_blocks.append((tx.hash.hex(), block_number))
return tx_hashes_blocks


def get_auction_id(web3: Web3, tx_hash: str) -> int:
"""
Method that finds an auction id given a transaction hash.
"""
transaction = web3.eth.get_transaction(HexBytes(tx_hash))
call_data = transaction["input"]
# convert call_data to hexString if it's in hexBytes
call_data_bytes = bytes.fromhex(
call_data.hex()[2:] if isinstance(call_data, HexBytes) else call_data[2:]
)
# convert bytes to int
auction_id = int.from_bytes(call_data_bytes[-8:], byteorder="big")
return auction_id


def read_sql_file(file_path: str) -> str:
"""This function reads a file (SQL) and returns its content as a string."""
with open(file_path, "r") as file:
Expand Down
4 changes: 0 additions & 4 deletions src/sql/select_transactions.sql

This file was deleted.

Loading