Skip to content

Commit

Permalink
restructure TransactionProcessor class
Browse files Browse the repository at this point in the history
  • Loading branch information
harisang committed Oct 16, 2024
1 parent 6d41ab6 commit 41c80c1
Showing 1 changed file with 94 additions and 178 deletions.
272 changes: 94 additions & 178 deletions src/transaction_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,43 @@ def __init__(
self.price_providers = PriceFeed(activate=process_prices)
self.log_message: list[str] = []

###################### MAIN RUN LOOP ######################
def run(self) -> None:
"""Main Daemon loop that processes txs and computes imbalances,
relevant prices and fees, if needed."""

start_block = self.get_start_block()
previous_block = start_block
unprocessed_txs: list[tuple[str, int, int]] = []
logger.info("%s daemon started. Start block: %d", self.chain_name, start_block)

while True:
try:
latest_block = self.blockchain_data.get_latest_block()
new_txs = self.blockchain_data.fetch_tx_data(
previous_block, latest_block
)
all_txs = new_txs + unprocessed_txs
unprocessed_txs.clear()

for tx_hash, auction_id, block_number in all_txs:
try:
self.process_single_transaction(
tx_hash, auction_id, block_number
)
except Exception as e:
unprocessed_txs.append((tx_hash, auction_id, block_number))
logger.error(f"Error processing transaction {tx_hash}: {e}")

previous_block = latest_block + 1
time.sleep(CHAIN_SLEEP_TIME)

except Exception as e:
logger.error(f"Error in processing loop: {e}")
time.sleep(CHAIN_SLEEP_TIME)

###########################################################

def get_start_block(self) -> int:
"""
Retrieve the most recent block X already present in raw_token_imbalances table,
Expand Down Expand Up @@ -80,139 +117,50 @@ def get_start_block(self) -> int:

return start_block

def run(self) -> None:
"""Main Daemon loop that processes txs and computes imbalances,
relevant prices and fees, if needed."""

start_block = self.get_start_block()
previous_block = start_block
unprocessed_txs: list[tuple[str, int, int]] = []
logger.info("%s daemon started. Start block: %d", self.chain_name, start_block)

while True:
try:
latest_block = self.blockchain_data.get_latest_block()
new_txs = self.blockchain_data.fetch_tx_data(
previous_block, latest_block
)
all_txs = new_txs + unprocessed_txs
unprocessed_txs.clear()

for tx_hash, auction_id, block_number in all_txs:
try:
self.process_single_transaction(
tx_hash, auction_id, block_number
)
except Exception as e:
unprocessed_txs.append((tx_hash, auction_id, block_number))
logger.error(f"Error processing transaction {tx_hash}: {e}")

previous_block = latest_block + 1
time.sleep(CHAIN_SLEEP_TIME)

except Exception as e:
logger.error(f"Error in processing loop: {e}")
time.sleep(CHAIN_SLEEP_TIME)

def process_single_transaction(
self, tx_hash: str, auction_id: int, block_number: int
self,
tx_hash: str,
auction_id: int,
block_number: int,
) -> None:
"""Function processes a single tx to find imbalances, fees, prices including writing to database."""
self.log_message = []
try:
# get transaction timestamp
transaction_timestamp = self.blockchain_data.get_transaction_timestamp(
tx_hash
)
# store transaction timestamp
self.db.write_transaction_timestamp(transaction_timestamp)

# get transaction tokens
transaction_tokens = self.imbalances.get_transaction_tokens(tx_hash)
# store transaction tokens
self.db.write_transaction_tokens(transaction_tokens)

# update token decimals
update_token_decimals(self.db, self.blockchain_data)

# get prices
prices_new = self.get_prices_for_tokens(
transaction_timestamp, transaction_tokens
)
# store prices
self.db.write_prices_new(prices_new)

# Compute Raw Token Imbalances
# if self.process_imbalances:
# token_imbalances = self.process_token_imbalances(
# tx_hash, auction_id, block_number
# )

# # Compute Fees
# if self.process_fees:
# (
# protocol_fees,
# partner_fees,
# network_fees,
# ) = self.process_fees_for_transaction(tx_hash)

# # Compute Prices
# if self.process_prices and self.process_imbalances:
# prices = self.process_prices_for_tokens(
# token_imbalances, block_number, tx_hash
# )

# Write to database iff no errors in either computations
# if (
# (not self.process_imbalances)
# and (not self.process_fees)
# and (not self.process_prices)
# ):
# return
if self.process_prices:
self.process_tx_prices(tx_hash)

if self.process_imbalances:
token_imbalances = self.imbalances.compute_token_imbalances(tx_hash)
if token_imbalances:
self.handle_imbalances(
token_imbalances, tx_hash, auction_id, block_number
)
self.process_tx_imbalances(tx_hash, auction_id, block_number)

# if self.process_fees:
# self.handle_fees(
# protocol_fees,
# partner_fees,
# network_fees,
# auction_id,
# block_number,
# tx_hash,
# )

# if self.process_prices and prices:
# self.handle_prices(prices, tx_hash, block_number)
if self.process_fees:
self.process_tx_fees(tx_hash, auction_id, block_number)

logger.info("\n".join(self.log_message))

except Exception as err:
logger.error(f"An Error occurred: {err}")
return

def process_fees_for_transaction(
self,
tx_hash: str,
) -> tuple[
dict[str, tuple[str, int]],
dict[str, tuple[str, int, str]],
dict[str, tuple[str, int]],
]:
"""Process and return protocol and network fees for a given transaction."""
try:
protocol_fees, partner_fees, network_fees = compute_all_fees_of_batch(
HexBytes(tx_hash)
)
return protocol_fees, partner_fees, network_fees
except Exception as e:
logger.error(f"Failed to process fees for transaction {tx_hash}: {e}")
return {}, {}, {}
def process_tx_prices(self, tx_hash: str) -> None:
# get transaction timestamp
transaction_timestamp = self.blockchain_data.get_transaction_timestamp(tx_hash)
# store transaction timestamp
self.db.write_transaction_timestamp(transaction_timestamp)

# get transaction tokens
transaction_tokens = self.imbalances.get_transaction_tokens(tx_hash)
# store transaction tokens
self.db.write_transaction_tokens(transaction_tokens)

# update token decimals
update_token_decimals(self.db, self.blockchain_data)

# get prices
prices_new = self.get_prices_for_tokens(
transaction_timestamp, transaction_tokens
)
# store prices
self.db.write_prices_new(prices_new)

def get_prices_for_tokens(
self,
Expand Down Expand Up @@ -247,61 +195,42 @@ def get_prices_for_tokens(

return prices

def process_prices_for_tokens(
self,
token_imbalances: dict[str, int],
block_number: int,
tx_hash: str,
) -> dict[str, tuple[float, str]]:
"""Compute prices for tokens with non-null imbalances."""
prices = {}
try:
for token_address in token_imbalances.keys():
price_data = self.price_providers.get_price(
set_params(token_address, block_number, tx_hash)
)
if price_data:
price, source = price_data[0]
prices[token_address] = (price, source)
except Exception as e:
logger.error(f"Failed to process prices for transaction {tx_hash}: {e}")

return prices
def process_tx_imbalances(
self, tx_hash: str, auction_id: int, block_number: int
) -> None:
token_imbalances = self.imbalances.compute_token_imbalances(tx_hash)
if token_imbalances:
try:
for token_address, imbalance in token_imbalances.items():
if imbalance != 0:
self.db.write_token_imbalances(
tx_hash,
auction_id,
block_number,
token_address,
imbalance,
)
self.log_message.append(
f"Token: {token_address}, Imbalance: {imbalance}"
)
except Exception as err:
logger.error(f"Error: {err}")

def handle_imbalances(
def process_tx_fees(
self,
token_imbalances: dict[str, int],
tx_hash: str,
auction_id: int,
block_number: int,
) -> None:
"""Function loops over non-null raw imbalances and writes them to the database."""
"""Process protocol, partner and network fees for a given transaction."""
try:
for token_address, imbalance in token_imbalances.items():
if imbalance != 0:
self.db.write_token_imbalances(
tx_hash,
auction_id,
block_number,
token_address,
imbalance,
)
self.log_message.append(
f"Token: {token_address}, Imbalance: {imbalance}"
)
except Exception as err:
logger.error(f"Error: {err}")
protocol_fees, partner_fees, network_fees = compute_all_fees_of_batch(
HexBytes(tx_hash)
)
except Exception as e:
logger.error(f"Failed to compute fees for transaction {tx_hash}: {e}")
return

def handle_fees(
self,
protocol_fees: dict[str, tuple[str, int]],
partner_fees: dict[str, tuple[str, int, str]],
network_fees: dict[str, tuple[str, int]],
auction_id: int,
block_number: int,
tx_hash: str,
):
"""This function loops over (token, fee) and calls write_fees to write to table."""
try:
# Write protocol fees
for order_uid, (token_address, fee_amount) in protocol_fees.items():
Expand Down Expand Up @@ -350,19 +279,6 @@ def handle_fees(
f"Failed to write fees to database for transaction {tx_hash}: {err}"
)

def handle_prices(
self, prices: dict[str, tuple[float, str]], tx_hash: str, block_number: int
) -> None:
"""Function writes prices to table per token."""
try:
for token_address, (price, source) in prices.items():
self.db.write_prices(
source, block_number, tx_hash, token_address, price
)
self.log_message.append(f"Token: {token_address}, Price: {price} ETH")
except Exception as err:
logger.error(f"Error: {err}")


def calculate_slippage(
token_imbalances: dict[str, int],
Expand Down

0 comments on commit 41c80c1

Please sign in to comment.