Skip to content

Commit

Permalink
use the message store for mempool validation (#218)
Browse files Browse the repository at this point in the history
Switch from using the trie to the account store for the duplicate
message check in the mempool.
  • Loading branch information
aditiharini authored Jan 15, 2025
1 parent fc591e8 commit 237d814
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 49 deletions.
103 changes: 57 additions & 46 deletions src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@ use tokio::{
};

use crate::storage::{
store::{engine::MempoolMessage, stores::Stores},
trie::merkle_trie::{self, TrieKey},
db::RocksDbTransactionBatch,
store::{
account::{
get_message_by_key, make_message_primary_key, make_ts_hash, type_to_set_postfix,
UserDataStore,
},
engine::MempoolMessage,
stores::Stores,
},
};

use super::routing::{MessageRouter, ShardRouter};
Expand Down Expand Up @@ -62,29 +69,62 @@ impl Mempool {
}
}

fn message_exists_in_trie(&mut self, fid: u64, trie_key: Vec<u8>) -> bool {
fn message_already_exists(&mut self, message: &MempoolMessage) -> bool {
let fid = message.fid();
let shard = self.message_router.route_message(fid, self.num_shards);
let stores = self.shard_stores.get_mut(&shard);
// Default to false in the orror paths
match stores {
None => {
error!("Error finding store for shard: {}", shard);
false
}
Some(stores) => {
// TODO(aditi): The engine reloads its ref to the trie on commit but we maintain a separate ref to the trie here.
stores.trie.reload(&stores.db).unwrap();
match stores.trie.exists(
&merkle_trie::Context::new(),
&stores.db,
trie_key.as_ref(),
) {
Err(err) => {
error!("Error finding key in trie: {}", err);
false
Some(stores) => match message {
MempoolMessage::UserMessage(message) => match &message.data {
None => false,
Some(message_data) => {
let ts_hash = make_ts_hash(message_data.timestamp, &message.hash).unwrap();
let set_postfix = type_to_set_postfix(message_data.r#type());
let primary_key =
make_message_primary_key(fid, set_postfix as u8, Some(&ts_hash));
let existing_message = get_message_by_key(
&stores.db,
&mut RocksDbTransactionBatch::new(),
&primary_key,
);
match existing_message {
Ok(Some(_)) => true,
Err(_) | Ok(None) => false,
}
}
},
MempoolMessage::ValidatorMessage(message) => {
if let Some(onchain_event) = &message.on_chain_event {
match stores.onchain_event_store.exists(&onchain_event) {
Err(_) => return false,
Ok(exists) => return exists,
}
}

if let Some(fname_transfer) = &message.fname_transfer {
match &fname_transfer.proof {
None => return false,
Some(proof) => {
let username_proof = UserDataStore::get_username_proof(
&stores.user_data_store,
&mut RocksDbTransactionBatch::new(),
&proof.name,
);
match username_proof {
Err(_) | Ok(None) => return false,
Ok(Some(_)) => return true,
}
}
}
}
Ok(exists) => exists,
return false;
}
}
},
}
}

Expand Down Expand Up @@ -112,37 +152,8 @@ impl Mempool {
}
}

fn get_trie_key(message: &MempoolMessage) -> Option<Vec<u8>> {
match message {
MempoolMessage::UserMessage(message) => return Some(TrieKey::for_message(message)),
MempoolMessage::ValidatorMessage(validator_message) => {
if let Some(onchain_event) = &validator_message.on_chain_event {
return Some(TrieKey::for_onchain_event(&onchain_event));
}

if let Some(fname_transfer) = &validator_message.fname_transfer {
if let Some(proof) = &fname_transfer.proof {
let name = String::from_utf8(proof.name.clone()).unwrap();
return Some(TrieKey::for_fname(fname_transfer.id, &name));
}
}

return None;
}
}
}

fn is_message_already_merged(&mut self, message: &MempoolMessage) -> bool {
let fid = message.fid();
let trie_key = Self::get_trie_key(&message);
match trie_key {
Some(trie_key) => self.message_exists_in_trie(fid, trie_key),
None => false,
}
}

pub fn message_is_valid(&mut self, message: &MempoolMessage) -> bool {
if self.is_message_already_merged(message) {
if self.message_already_exists(message) {
return false;
}

Expand Down
57 changes: 55 additions & 2 deletions src/mempool/mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ mod tests {

use crate::{
mempool::mempool::Mempool,
proto::{FnameTransfer, UserNameProof, UserNameType, ValidatorMessage},
storage::store::{
engine::{MempoolMessage, ShardEngine},
test_helper,
},
utils::factory::messages_factory,
utils::factory::{events_factory, messages_factory},
};

use self::test_helper::{default_custody_address, default_signer};
Expand All @@ -28,7 +29,7 @@ mod tests {
}

#[tokio::test]
async fn test_duplicate_message_is_invalid() {
async fn test_duplicate_user_message_is_invalid() {
let (mut engine, mut mempool) = setup();
test_helper::register_user(
1234,
Expand All @@ -44,4 +45,56 @@ mod tests {
let valid = mempool.message_is_valid(&MempoolMessage::UserMessage(cast.clone()));
assert!(!valid)
}

#[tokio::test]
async fn test_duplicate_onchain_event_is_invalid() {
let (mut engine, mut mempool) = setup();
let onchain_event = events_factory::create_rent_event(1234, Some(10), None, false);
let valid = mempool.message_is_valid(&MempoolMessage::ValidatorMessage(ValidatorMessage {
on_chain_event: Some(onchain_event.clone()),
fname_transfer: None,
}));
assert!(valid);
test_helper::commit_event(&mut engine, &onchain_event).await;
let valid = mempool.message_is_valid(&MempoolMessage::ValidatorMessage(ValidatorMessage {
on_chain_event: Some(onchain_event.clone()),
fname_transfer: None,
}));
assert!(!valid)
}

#[tokio::test]
async fn test_duplicate_fname_transfer_is_invalid() {
let (mut engine, mut mempool) = setup();
test_helper::register_user(
1234,
default_signer(),
default_custody_address(),
&mut engine,
)
.await;
let fname_transfer = FnameTransfer {
id: 1234,
from_fid: 0,
proof: Some(UserNameProof {
timestamp: messages_factory::farcaster_time() as u64,
name: "farcaster".as_bytes().to_vec(),
owner: default_custody_address(),
signature: "signature".as_bytes().to_vec(),
fid: 1234,
r#type: UserNameType::UsernameTypeEnsL1 as i32,
}),
};
let valid = mempool.message_is_valid(&MempoolMessage::ValidatorMessage(ValidatorMessage {
on_chain_event: None,
fname_transfer: Some(fname_transfer.clone()),
}));
assert!(valid);
test_helper::commit_fname_transfer(&mut engine, &fname_transfer).await;
let valid = mempool.message_is_valid(&MempoolMessage::ValidatorMessage(ValidatorMessage {
on_chain_event: None,
fname_transfer: Some(fname_transfer),
}));
assert!(!valid)
}
}
8 changes: 8 additions & 0 deletions src/storage/store/account/onchain_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,4 +465,12 @@ impl OnchainEventStore {
}
Ok(storage_slot)
}

pub fn exists(&self, onchain_event: &OnChainEvent) -> Result<bool, OnchainEventStorageError> {
let primary_key = make_onchain_event_primary_key(onchain_event);
match self.db.get(&primary_key)? {
None => Ok(false),
Some(_) => Ok(true),
}
}
}
19 changes: 18 additions & 1 deletion src/storage/store/test_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use tempfile;
use tokio::sync::mpsc;

use crate::core::error::HubError;
use crate::proto;
#[allow(unused_imports)] // Used by cfg(test)
use crate::proto::{self, FnameTransfer};
use crate::proto::{Height, ShardChunk, ShardHeader, Transaction};
use crate::proto::{MessagesResponse, OnChainEvent};
use crate::storage::store::account::MessagesPage;
Expand Down Expand Up @@ -150,6 +151,22 @@ pub async fn commit_event(engine: &mut ShardEngine, event: &OnChainEvent) -> Sha
validate_and_commit_state_change(engine, &state_change)
}

#[cfg(test)]
pub async fn commit_fname_transfer(
engine: &mut ShardEngine,
fname_transfer: &FnameTransfer,
) -> ShardChunk {
let state_change = engine.propose_state_change(
1,
vec![MempoolMessage::ValidatorMessage(proto::ValidatorMessage {
on_chain_event: None,
fname_transfer: Some(fname_transfer.clone()),
})],
);

validate_and_commit_state_change(engine, &state_change)
}

#[cfg(test)]
pub async fn commit_message(engine: &mut ShardEngine, msg: &proto::Message) -> ShardChunk {
let state_change =
Expand Down

0 comments on commit 237d814

Please sign in to comment.