Skip to content

Commit

Permalink
Merge pull request #152 from idky137/update_mempool
Browse files Browse the repository at this point in the history
Update mempool
  • Loading branch information
AloeareV authored Jan 8, 2025
2 parents 5032d66 + 2068619 commit 3743544
Show file tree
Hide file tree
Showing 10 changed files with 1,327 additions and 81 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ tower = { version = "0.4", features = ["buffer", "util"] }
async-trait = "0.1"
chrono = "0.4"
jsonrpc-core = "18.0.0"
dashmap = "6.1"
10 changes: 9 additions & 1 deletion zaino-fetch/src/chain/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,17 @@ impl Mempool {

let mut txids_to_exclude: HashSet<String> = HashSet::new();
for exclude_txid in &exclude_txids {
// Convert to big endian (server format).
let server_exclude_txid: String = exclude_txid
.chars()
.collect::<Vec<_>>()
.chunks(2)
.rev()
.map(|chunk| chunk.iter().collect::<String>())
.collect();
let matching_txids: Vec<&String> = mempool_txids
.iter()
.filter(|txid| txid.starts_with(exclude_txid))
.filter(|txid| txid.starts_with(&server_exclude_txid))
.collect();

if matching_txids.len() == 1 {
Expand Down
1 change: 1 addition & 0 deletions zaino-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ futures = { workspace = true }
tonic = { workspace = true }
http = { workspace = true }
lazy-regex = { workspace = true }
dashmap = { workspace = true }

[dev-dependencies]
zaino-testutils = { path = "../zaino-testutils" }
Expand Down
236 changes: 236 additions & 0 deletions zaino-state/src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
//! Holds zaino-state::Broadcast, a thread safe broadcaster used by the mempool and non-finalised state.
use dashmap::DashMap;
use std::{collections::HashSet, hash::Hash, sync::Arc};
use tokio::sync::watch;

use crate::status::StatusType;

/// A generic, thread-safe broadcaster that manages mutable state and notifies clients of updates.
#[derive(Clone)]
pub struct Broadcast<K, V> {
state: Arc<DashMap<K, V>>,
notifier: watch::Sender<StatusType>,
}

impl<K: Eq + Hash + Clone, V: Clone> Broadcast<K, V> {
/// Creates a new Broadcast instance, uses default dashmap spec.
pub fn new_default() -> Self {
let (notifier, _) = watch::channel(StatusType::Spawning);
Self {
state: Arc::new(DashMap::new()),
notifier,
}
}

/// Creates a new Broadcast instance, exposes dashmap spec.
pub fn new_custom(capacity: usize, shard_amount: usize) -> Self {
let (notifier, _) = watch::channel(StatusType::Spawning);
Self {
state: Arc::new(DashMap::with_capacity_and_shard_amount(
capacity,
shard_amount,
)),
notifier,
}
}

/// Inserts or updates an entry in the state and broadcasts an update.
pub fn insert(&self, key: K, value: V, status: StatusType) {
self.state.insert(key, value);
let _ = self.notifier.send(status);
}

/// Inserts or updates an entry in the state and broadcasts an update.
pub fn insert_set(&self, set: Vec<(K, V)>, status: StatusType) {
for (key, value) in set {
self.state.insert(key, value);
}
let _ = self.notifier.send(status);
}

/// Inserts only new entries from the set into the state and broadcasts an update.
pub fn insert_filtered_set(&self, set: Vec<(K, V)>, status: StatusType) {
for (key, value) in set {
// Check if the key is already in the map
if self.state.get(&key).is_none() {
self.state.insert(key, value);
}
}
let _ = self.notifier.send(status);
}

/// Removes an entry from the state and broadcasts an update.
pub fn remove(&self, key: &K, status: StatusType) {
self.state.remove(key);
let _ = self.notifier.send(status);
}

/// Retrieves a value from the state by key.
pub fn get(&self, key: &K) -> Option<Arc<V>> {
self.state
.get(key)
.map(|entry| Arc::new((*entry.value()).clone()))
}

/// Retrieves a set of values from the state by a list of keys.
pub fn get_set(&self, keys: &[K]) -> Vec<(K, Arc<V>)> {
keys.iter()
.filter_map(|key| {
self.state
.get(key)
.map(|entry| (key.clone(), Arc::new((*entry.value()).clone())))
})
.collect()
}

/// Checks if a key exists in the state.
pub fn contains_key(&self, key: &K) -> bool {
self.state.contains_key(key)
}

/// Returns a receiver to listen for state update notifications.
pub fn subscribe(&self) -> watch::Receiver<StatusType> {
self.notifier.subscribe()
}

/// Returns a [`BroadcastSubscriber`] to the [`Broadcast`].
pub fn subscriber(&self) -> BroadcastSubscriber<K, V> {
BroadcastSubscriber {
state: self.get_state(),
notifier: self.subscribe(),
}
}

/// Provides read access to the internal state.
pub fn get_state(&self) -> Arc<DashMap<K, V>> {
Arc::clone(&self.state)
}

/// Returns the whole state excluding keys in the ignore list.
pub fn get_filtered_state(&self, ignore_list: &HashSet<K>) -> Vec<(K, V)> {
self.state
.iter()
.filter(|entry| !ignore_list.contains(entry.key()))
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect()
}

/// Clears all entries from the state.
pub fn clear(&self) {
self.state.clear();
}

/// Returns the number of entries in the state.
pub fn len(&self) -> usize {
self.state.len()
}

/// Returns true if the state is empty.
pub fn is_empty(&self) -> bool {
self.state.is_empty()
}

/// Broadcasts an update.
pub fn notify(&self, status: StatusType) {
if self.notifier.send(status).is_err() {
eprintln!("No subscribers are currently listening for updates.");
}
}
}

impl<K: Eq + Hash + Clone, V: Clone> Default for Broadcast<K, V> {
fn default() -> Self {
Self::new_default()
}
}

impl<K: Eq + Hash + Clone + std::fmt::Debug, V: Clone + std::fmt::Debug> std::fmt::Debug
for Broadcast<K, V>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state_contents: Vec<_> = self
.state
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect();
f.debug_struct("Broadcast")
.field("state", &state_contents)
.field("notifier", &"watch::Sender<StatusType>")
.finish()
}
}

/// A generic, thread-safe broadcaster that manages mutable state and notifies clients of updates.
#[derive(Clone)]
pub struct BroadcastSubscriber<K, V> {
state: Arc<DashMap<K, V>>,
notifier: watch::Receiver<StatusType>,
}

impl<K: Eq + Hash + Clone, V: Clone> BroadcastSubscriber<K, V> {
/// Waits on notifier update and returns StatusType.
pub async fn wait_on_notifier(&mut self) -> Result<StatusType, watch::error::RecvError> {
self.notifier.changed().await?;
let status = self.notifier.borrow().clone();
Ok(status)
}

/// Retrieves a value from the state by key.
pub fn get(&self, key: &K) -> Option<Arc<V>> {
self.state
.get(key)
.map(|entry| Arc::new((*entry.value()).clone()))
}

/// Retrieves a set of values from the state by a list of keys.
pub fn get_set(&self, keys: &[K]) -> Vec<(K, Arc<V>)> {
keys.iter()
.filter_map(|key| {
self.state
.get(key)
.map(|entry| (key.clone(), Arc::new((*entry.value()).clone())))
})
.collect()
}

/// Checks if a key exists in the state.
pub fn contains_key(&self, key: &K) -> bool {
self.state.contains_key(key)
}

/// Returns the whole state excluding keys in the ignore list.
pub fn get_filtered_state(&self, ignore_list: &HashSet<K>) -> Vec<(K, V)> {
self.state
.iter()
.filter(|entry| !ignore_list.contains(entry.key()))
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect()
}

/// Returns the number of entries in the state.
pub fn len(&self) -> usize {
self.state.len()
}

/// Returns true if the state is empty.
pub fn is_empty(&self) -> bool {
self.state.is_empty()
}
}

impl<K: Eq + Hash + Clone + std::fmt::Debug, V: Clone + std::fmt::Debug> std::fmt::Debug
for BroadcastSubscriber<K, V>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state_contents: Vec<_> = self
.state
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect();
f.debug_struct("Broadcast")
.field("state", &state_contents)
.field("notifier", &"watch::Sender<StatusType>")
.finish()
}
}
49 changes: 49 additions & 0 deletions zaino-state/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ pub enum FetchServiceError {
#[error("JsonRpcConnector error: {0}")]
JsonRpcConnectorError(#[from] zaino_fetch::jsonrpc::error::JsonRpcConnectorError),

/// Error from the mempool.
#[error("Mempool error: {0}")]
MempoolError(#[from] MempoolError),

/// RPC error in compatibility with zcashd.
#[error("RPC error: {0:?}")]
RpcError(#[from] zaino_fetch::jsonrpc::connector::RpcError),
Expand Down Expand Up @@ -132,6 +136,9 @@ impl From<FetchServiceError> for tonic::Status {
FetchServiceError::JsonRpcConnectorError(err) => {
tonic::Status::internal(format!("JsonRpcConnector error: {}", err))
}
FetchServiceError::MempoolError(err) => {
tonic::Status::internal(format!("Mempool error: {}", err))
}
FetchServiceError::RpcError(err) => {
tonic::Status::internal(format!("RPC error: {:?}", err))
}
Expand Down Expand Up @@ -160,3 +167,45 @@ impl From<FetchServiceError> for tonic::Status {
}
}
}

/// Errors related to the `StateService`.
#[derive(Debug, thiserror::Error)]
pub enum MempoolError {
/// Custom Errors. *Remove before production.
#[error("Custom error: {0}")]
Custom(String),

/// Error from a Tokio JoinHandle.
#[error("Join error: {0}")]
JoinError(#[from] tokio::task::JoinError),

/// Error from JsonRpcConnector.
#[error("JsonRpcConnector error: {0}")]
JsonRpcConnectorError(#[from] zaino_fetch::jsonrpc::error::JsonRpcConnectorError),

/// Error from a Tokio Watch Reciever.
#[error("Join error: {0}")]
WatchRecvError(#[from] tokio::sync::watch::error::RecvError),

/// Unexpected status-related error.
#[error("Status error: {0:?}")]
StatusError(StatusError),

/// Error from sending to a Tokio MPSC channel.
#[error("Send error: {0}")]
SendError(
#[from]
tokio::sync::mpsc::error::SendError<
Result<(crate::mempool::MempoolKey, crate::mempool::MempoolValue), StatusError>,
>,
),

/// A generic boxed error.
#[error("Generic error: {0}")]
Generic(#[from] Box<dyn std::error::Error + Send + Sync>),
}

/// A general error type to represent error StatusTypes.
#[derive(Debug, Clone, thiserror::Error)]
#[error("Unexpected status error: {0:?}")]
pub struct StatusError(pub crate::status::StatusType);
Loading

0 comments on commit 3743544

Please sign in to comment.