From 237d81405af2e43d8caad469cf3e3b49390a5ce5 Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Tue, 14 Jan 2025 23:37:59 -0500 Subject: [PATCH] use the message store for mempool validation (#218) Switch from using the trie to the account store for the duplicate message check in the mempool. --- src/mempool/mempool.rs | 103 ++++++++++-------- src/mempool/mempool_test.rs | 57 +++++++++- .../store/account/onchain_event_store.rs | 8 ++ src/storage/store/test_helper.rs | 19 +++- 4 files changed, 138 insertions(+), 49 deletions(-) diff --git a/src/mempool/mempool.rs b/src/mempool/mempool.rs index 65297b1..431bc38 100644 --- a/src/mempool/mempool.rs +++ b/src/mempool/mempool.rs @@ -7,8 +7,15 @@ use tokio::{ }; use crate::storage::{ - store::{engine::MempoolMessage, stores::Stores}, - trie::merkle_trie::{self, TrieKey}, + db::RocksDbTransactionBatch, + store::{ + account::{ + get_message_by_key, make_message_primary_key, make_ts_hash, type_to_set_postfix, + UserDataStore, + }, + engine::MempoolMessage, + stores::Stores, + }, }; use super::routing::{MessageRouter, ShardRouter}; @@ -62,29 +69,62 @@ impl Mempool { } } - fn message_exists_in_trie(&mut self, fid: u64, trie_key: Vec) -> bool { + fn message_already_exists(&mut self, message: &MempoolMessage) -> bool { + let fid = message.fid(); let shard = self.message_router.route_message(fid, self.num_shards); let stores = self.shard_stores.get_mut(&shard); + // Default to false in the orror paths match stores { None => { error!("Error finding store for shard: {}", shard); false } - Some(stores) => { - // TODO(aditi): The engine reloads its ref to the trie on commit but we maintain a separate ref to the trie here. - stores.trie.reload(&stores.db).unwrap(); - match stores.trie.exists( - &merkle_trie::Context::new(), - &stores.db, - trie_key.as_ref(), - ) { - Err(err) => { - error!("Error finding key in trie: {}", err); - false + Some(stores) => match message { + MempoolMessage::UserMessage(message) => match &message.data { + None => false, + Some(message_data) => { + let ts_hash = make_ts_hash(message_data.timestamp, &message.hash).unwrap(); + let set_postfix = type_to_set_postfix(message_data.r#type()); + let primary_key = + make_message_primary_key(fid, set_postfix as u8, Some(&ts_hash)); + let existing_message = get_message_by_key( + &stores.db, + &mut RocksDbTransactionBatch::new(), + &primary_key, + ); + match existing_message { + Ok(Some(_)) => true, + Err(_) | Ok(None) => false, + } + } + }, + MempoolMessage::ValidatorMessage(message) => { + if let Some(onchain_event) = &message.on_chain_event { + match stores.onchain_event_store.exists(&onchain_event) { + Err(_) => return false, + Ok(exists) => return exists, + } + } + + if let Some(fname_transfer) = &message.fname_transfer { + match &fname_transfer.proof { + None => return false, + Some(proof) => { + let username_proof = UserDataStore::get_username_proof( + &stores.user_data_store, + &mut RocksDbTransactionBatch::new(), + &proof.name, + ); + match username_proof { + Err(_) | Ok(None) => return false, + Ok(Some(_)) => return true, + } + } + } } - Ok(exists) => exists, + return false; } - } + }, } } @@ -112,37 +152,8 @@ impl Mempool { } } - fn get_trie_key(message: &MempoolMessage) -> Option> { - match message { - MempoolMessage::UserMessage(message) => return Some(TrieKey::for_message(message)), - MempoolMessage::ValidatorMessage(validator_message) => { - if let Some(onchain_event) = &validator_message.on_chain_event { - return Some(TrieKey::for_onchain_event(&onchain_event)); - } - - if let Some(fname_transfer) = &validator_message.fname_transfer { - if let Some(proof) = &fname_transfer.proof { - let name = String::from_utf8(proof.name.clone()).unwrap(); - return Some(TrieKey::for_fname(fname_transfer.id, &name)); - } - } - - return None; - } - } - } - - fn is_message_already_merged(&mut self, message: &MempoolMessage) -> bool { - let fid = message.fid(); - let trie_key = Self::get_trie_key(&message); - match trie_key { - Some(trie_key) => self.message_exists_in_trie(fid, trie_key), - None => false, - } - } - pub fn message_is_valid(&mut self, message: &MempoolMessage) -> bool { - if self.is_message_already_merged(message) { + if self.message_already_exists(message) { return false; } diff --git a/src/mempool/mempool_test.rs b/src/mempool/mempool_test.rs index f243d29..e2858d6 100644 --- a/src/mempool/mempool_test.rs +++ b/src/mempool/mempool_test.rs @@ -6,11 +6,12 @@ mod tests { use crate::{ mempool::mempool::Mempool, + proto::{FnameTransfer, UserNameProof, UserNameType, ValidatorMessage}, storage::store::{ engine::{MempoolMessage, ShardEngine}, test_helper, }, - utils::factory::messages_factory, + utils::factory::{events_factory, messages_factory}, }; use self::test_helper::{default_custody_address, default_signer}; @@ -28,7 +29,7 @@ mod tests { } #[tokio::test] - async fn test_duplicate_message_is_invalid() { + async fn test_duplicate_user_message_is_invalid() { let (mut engine, mut mempool) = setup(); test_helper::register_user( 1234, @@ -44,4 +45,56 @@ mod tests { let valid = mempool.message_is_valid(&MempoolMessage::UserMessage(cast.clone())); assert!(!valid) } + + #[tokio::test] + async fn test_duplicate_onchain_event_is_invalid() { + let (mut engine, mut mempool) = setup(); + let onchain_event = events_factory::create_rent_event(1234, Some(10), None, false); + let valid = mempool.message_is_valid(&MempoolMessage::ValidatorMessage(ValidatorMessage { + on_chain_event: Some(onchain_event.clone()), + fname_transfer: None, + })); + assert!(valid); + test_helper::commit_event(&mut engine, &onchain_event).await; + let valid = mempool.message_is_valid(&MempoolMessage::ValidatorMessage(ValidatorMessage { + on_chain_event: Some(onchain_event.clone()), + fname_transfer: None, + })); + assert!(!valid) + } + + #[tokio::test] + async fn test_duplicate_fname_transfer_is_invalid() { + let (mut engine, mut mempool) = setup(); + test_helper::register_user( + 1234, + default_signer(), + default_custody_address(), + &mut engine, + ) + .await; + let fname_transfer = FnameTransfer { + id: 1234, + from_fid: 0, + proof: Some(UserNameProof { + timestamp: messages_factory::farcaster_time() as u64, + name: "farcaster".as_bytes().to_vec(), + owner: default_custody_address(), + signature: "signature".as_bytes().to_vec(), + fid: 1234, + r#type: UserNameType::UsernameTypeEnsL1 as i32, + }), + }; + let valid = mempool.message_is_valid(&MempoolMessage::ValidatorMessage(ValidatorMessage { + on_chain_event: None, + fname_transfer: Some(fname_transfer.clone()), + })); + assert!(valid); + test_helper::commit_fname_transfer(&mut engine, &fname_transfer).await; + let valid = mempool.message_is_valid(&MempoolMessage::ValidatorMessage(ValidatorMessage { + on_chain_event: None, + fname_transfer: Some(fname_transfer), + })); + assert!(!valid) + } } diff --git a/src/storage/store/account/onchain_event_store.rs b/src/storage/store/account/onchain_event_store.rs index 7aadbf0..4a15cd0 100644 --- a/src/storage/store/account/onchain_event_store.rs +++ b/src/storage/store/account/onchain_event_store.rs @@ -465,4 +465,12 @@ impl OnchainEventStore { } Ok(storage_slot) } + + pub fn exists(&self, onchain_event: &OnChainEvent) -> Result { + let primary_key = make_onchain_event_primary_key(onchain_event); + match self.db.get(&primary_key)? { + None => Ok(false), + Some(_) => Ok(true), + } + } } diff --git a/src/storage/store/test_helper.rs b/src/storage/store/test_helper.rs index 5c0c81b..d7ae912 100644 --- a/src/storage/store/test_helper.rs +++ b/src/storage/store/test_helper.rs @@ -11,7 +11,8 @@ use tempfile; use tokio::sync::mpsc; use crate::core::error::HubError; -use crate::proto; +#[allow(unused_imports)] // Used by cfg(test) +use crate::proto::{self, FnameTransfer}; use crate::proto::{Height, ShardChunk, ShardHeader, Transaction}; use crate::proto::{MessagesResponse, OnChainEvent}; use crate::storage::store::account::MessagesPage; @@ -150,6 +151,22 @@ pub async fn commit_event(engine: &mut ShardEngine, event: &OnChainEvent) -> Sha validate_and_commit_state_change(engine, &state_change) } +#[cfg(test)] +pub async fn commit_fname_transfer( + engine: &mut ShardEngine, + fname_transfer: &FnameTransfer, +) -> ShardChunk { + let state_change = engine.propose_state_change( + 1, + vec![MempoolMessage::ValidatorMessage(proto::ValidatorMessage { + on_chain_event: None, + fname_transfer: Some(fname_transfer.clone()), + })], + ); + + validate_and_commit_state_change(engine, &state_change) +} + #[cfg(test)] pub async fn commit_message(engine: &mut ShardEngine, msg: &proto::Message) -> ShardChunk { let state_change =