diff --git a/src/transaction_processor.py b/src/transaction_processor.py index 1a38eb6..c6da16f 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -39,6 +39,43 @@ def __init__( self.price_providers = PriceFeed(activate=process_prices) self.log_message: list[str] = [] + ###################### MAIN RUN LOOP ###################### + def run(self) -> None: + """Main Daemon loop that processes txs and computes imbalances, + relevant prices and fees, if needed.""" + + start_block = self.get_start_block() + previous_block = start_block + unprocessed_txs: list[tuple[str, int, int]] = [] + logger.info("%s daemon started. Start block: %d", self.chain_name, start_block) + + while True: + try: + latest_block = self.blockchain_data.get_latest_block() + new_txs = self.blockchain_data.fetch_tx_data( + previous_block, latest_block + ) + all_txs = new_txs + unprocessed_txs + unprocessed_txs.clear() + + for tx_hash, auction_id, block_number in all_txs: + try: + self.process_single_transaction( + tx_hash, auction_id, block_number + ) + except Exception as e: + unprocessed_txs.append((tx_hash, auction_id, block_number)) + logger.error(f"Error processing transaction {tx_hash}: {e}") + + previous_block = latest_block + 1 + time.sleep(CHAIN_SLEEP_TIME) + + except Exception as e: + logger.error(f"Error in processing loop: {e}") + time.sleep(CHAIN_SLEEP_TIME) + + ########################################################### + def get_start_block(self) -> int: """ Retrieve the most recent block X already present in raw_token_imbalances table, @@ -80,115 +117,23 @@ def get_start_block(self) -> int: return start_block - def run(self) -> None: - """Main Daemon loop that processes txs and computes imbalances, - relevant prices and fees, if needed.""" - - start_block = self.get_start_block() - previous_block = start_block - unprocessed_txs: list[tuple[str, int, int]] = [] - logger.info("%s daemon started. Start block: %d", self.chain_name, start_block) - - while True: - try: - latest_block = self.blockchain_data.get_latest_block() - new_txs = self.blockchain_data.fetch_tx_data( - previous_block, latest_block - ) - all_txs = new_txs + unprocessed_txs - unprocessed_txs.clear() - - for tx_hash, auction_id, block_number in all_txs: - try: - self.process_single_transaction( - tx_hash, auction_id, block_number - ) - except Exception as e: - unprocessed_txs.append((tx_hash, auction_id, block_number)) - logger.error(f"Error processing transaction {tx_hash}: {e}") - - previous_block = latest_block + 1 - time.sleep(CHAIN_SLEEP_TIME) - - except Exception as e: - logger.error(f"Error in processing loop: {e}") - time.sleep(CHAIN_SLEEP_TIME) - def process_single_transaction( - self, tx_hash: str, auction_id: int, block_number: int + self, + tx_hash: str, + auction_id: int, + block_number: int, ) -> None: """Function processes a single tx to find imbalances, fees, prices including writing to database.""" self.log_message = [] try: - # get transaction timestamp - transaction_timestamp = self.blockchain_data.get_transaction_timestamp( - tx_hash - ) - # store transaction timestamp - self.db.write_transaction_timestamp(transaction_timestamp) - - # get transaction tokens - transaction_tokens = self.imbalances.get_transaction_tokens(tx_hash) - # store transaction tokens - self.db.write_transaction_tokens(transaction_tokens) - - # update token decimals - update_token_decimals(self.db, self.blockchain_data) - - # get prices - prices_new = self.get_prices_for_tokens( - transaction_timestamp, transaction_tokens - ) - # store prices - self.db.write_prices_new(prices_new) - - # Compute Raw Token Imbalances - # if self.process_imbalances: - # token_imbalances = self.process_token_imbalances( - # tx_hash, auction_id, block_number - # ) - - # # Compute Fees - # if self.process_fees: - # ( - # protocol_fees, - # partner_fees, - # network_fees, - # ) = self.process_fees_for_transaction(tx_hash) - - # # Compute Prices - # if self.process_prices and self.process_imbalances: - # prices = self.process_prices_for_tokens( - # token_imbalances, block_number, tx_hash - # ) - - # Write to database iff no errors in either computations - # if ( - # (not self.process_imbalances) - # and (not self.process_fees) - # and (not self.process_prices) - # ): - # return + if self.process_prices: + self.process_tx_prices(tx_hash) if self.process_imbalances: - token_imbalances = self.imbalances.compute_token_imbalances(tx_hash) - if token_imbalances: - self.handle_imbalances( - token_imbalances, tx_hash, auction_id, block_number - ) + self.process_tx_imbalances(tx_hash, auction_id, block_number) - # if self.process_fees: - # self.handle_fees( - # protocol_fees, - # partner_fees, - # network_fees, - # auction_id, - # block_number, - # tx_hash, - # ) - - # if self.process_prices and prices: - # self.handle_prices(prices, tx_hash, block_number) + if self.process_fees: + self.process_tx_fees(tx_hash, auction_id, block_number) logger.info("\n".join(self.log_message)) @@ -196,23 +141,26 @@ def process_single_transaction( logger.error(f"An Error occurred: {err}") return - def process_fees_for_transaction( - self, - tx_hash: str, - ) -> tuple[ - dict[str, tuple[str, int]], - dict[str, tuple[str, int, str]], - dict[str, tuple[str, int]], - ]: - """Process and return protocol and network fees for a given transaction.""" - try: - protocol_fees, partner_fees, network_fees = compute_all_fees_of_batch( - HexBytes(tx_hash) - ) - return protocol_fees, partner_fees, network_fees - except Exception as e: - logger.error(f"Failed to process fees for transaction {tx_hash}: {e}") - return {}, {}, {} + def process_tx_prices(self, tx_hash: str) -> None: + # get transaction timestamp + transaction_timestamp = self.blockchain_data.get_transaction_timestamp(tx_hash) + # store transaction timestamp + self.db.write_transaction_timestamp(transaction_timestamp) + + # get transaction tokens + transaction_tokens = self.imbalances.get_transaction_tokens(tx_hash) + # store transaction tokens + self.db.write_transaction_tokens(transaction_tokens) + + # update token decimals + update_token_decimals(self.db, self.blockchain_data) + + # get prices + prices_new = self.get_prices_for_tokens( + transaction_timestamp, transaction_tokens + ) + # store prices + self.db.write_prices_new(prices_new) def get_prices_for_tokens( self, @@ -247,61 +195,42 @@ def get_prices_for_tokens( return prices - def process_prices_for_tokens( - self, - token_imbalances: dict[str, int], - block_number: int, - tx_hash: str, - ) -> dict[str, tuple[float, str]]: - """Compute prices for tokens with non-null imbalances.""" - prices = {} - try: - for token_address in token_imbalances.keys(): - price_data = self.price_providers.get_price( - set_params(token_address, block_number, tx_hash) - ) - if price_data: - price, source = price_data[0] - prices[token_address] = (price, source) - except Exception as e: - logger.error(f"Failed to process prices for transaction {tx_hash}: {e}") - - return prices + def process_tx_imbalances( + self, tx_hash: str, auction_id: int, block_number: int + ) -> None: + token_imbalances = self.imbalances.compute_token_imbalances(tx_hash) + if token_imbalances: + try: + for token_address, imbalance in token_imbalances.items(): + if imbalance != 0: + self.db.write_token_imbalances( + tx_hash, + auction_id, + block_number, + token_address, + imbalance, + ) + self.log_message.append( + f"Token: {token_address}, Imbalance: {imbalance}" + ) + except Exception as err: + logger.error(f"Error: {err}") - def handle_imbalances( + def process_tx_fees( self, - token_imbalances: dict[str, int], tx_hash: str, auction_id: int, block_number: int, ) -> None: - """Function loops over non-null raw imbalances and writes them to the database.""" + """Process protocol, partner and network fees for a given transaction.""" try: - for token_address, imbalance in token_imbalances.items(): - if imbalance != 0: - self.db.write_token_imbalances( - tx_hash, - auction_id, - block_number, - token_address, - imbalance, - ) - self.log_message.append( - f"Token: {token_address}, Imbalance: {imbalance}" - ) - except Exception as err: - logger.error(f"Error: {err}") + protocol_fees, partner_fees, network_fees = compute_all_fees_of_batch( + HexBytes(tx_hash) + ) + except Exception as e: + logger.error(f"Failed to compute fees for transaction {tx_hash}: {e}") + return - def handle_fees( - self, - protocol_fees: dict[str, tuple[str, int]], - partner_fees: dict[str, tuple[str, int, str]], - network_fees: dict[str, tuple[str, int]], - auction_id: int, - block_number: int, - tx_hash: str, - ): - """This function loops over (token, fee) and calls write_fees to write to table.""" try: # Write protocol fees for order_uid, (token_address, fee_amount) in protocol_fees.items(): @@ -350,19 +279,6 @@ def handle_fees( f"Failed to write fees to database for transaction {tx_hash}: {err}" ) - def handle_prices( - self, prices: dict[str, tuple[float, str]], tx_hash: str, block_number: int - ) -> None: - """Function writes prices to table per token.""" - try: - for token_address, (price, source) in prices.items(): - self.db.write_prices( - source, block_number, tx_hash, token_address, price - ) - self.log_message.append(f"Token: {token_address}, Price: {price} ETH") - except Exception as err: - logger.error(f"Error: {err}") - def calculate_slippage( token_imbalances: dict[str, int],