Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[redis-rs][core] Move connection refresh to the background #2915

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use crate::cluster_async::ConnectionFuture;
use crate::cluster_routing::{Route, ShardAddrs, SlotAddr};
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
use crate::cluster_topology::TopologyHash;
use dashmap::DashMap;
use dashmap::{DashMap, DashSet};
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::net::IpAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use telemetrylib::Telemetry;

use tokio::task::JoinHandle;

/// Count the number of connections in a connections_map object
macro_rules! count_connections {
($conn_map:expr) => {{
Expand Down Expand Up @@ -121,6 +123,11 @@ pub(crate) enum ConnectionType {

pub(crate) struct ConnectionsMap<Connection>(pub(crate) DashMap<String, ClusterNode<Connection>>);

pub(crate) struct RefreshState<Connection> {
pub handle: JoinHandle<()>, // The currect running refresh task
pub node_conn: Option<ClusterNode<Connection>>, // The refreshed connection after the task is done
}

impl<Connection> std::fmt::Display for ConnectionsMap<Connection> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for item in self.0.iter() {
Expand All @@ -139,6 +146,13 @@ pub(crate) struct ConnectionsContainer<Connection> {
pub(crate) slot_map: SlotMap,
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,

// Holds all the failed addresses that started a refresh task.
pub(crate) refresh_addresses_started: DashSet<String>,
Copy link
Collaborator

@barshaul barshaul Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You can use the completed addresses to hold the newly created cluster node.
  2. Instead of throwing all refresh structs directly into the connection container we might be better by wrapping all updates with some struct, so later on we'll be able to add refresh slots or other updates there too.
    For example:
struct RefreshUpdates {
    in_progress_addresses: HashSet<String>, 
    completed_addresses: HashMap<String, ClusterNode<Connection>>, 
     // refreshed_slot_map: Option<SlotMap>,
}

impl RefreshUpdates {
    pub(crate) fn in_progress(&self) -> &HashSet<String> {
        &self.in_progress_addresses
    }

    pub(crate) fn add_refresh(&mut self, address: String) {
        self.in_progress_addresses.insert(address);
    }

    pub(crate) fn complete_refresh(&mut self, address: String, node: ClusterNode<Connection>) {
        if !self.in_progress_addresses.remove(&address) {
            warn!("Attempted to complete refresh for an address not in progress: {}", address);
        }
        self.completed_addresses.insert(address, node);
    }
    
    pub(crate) fn completed_addresses(&mut self) -> HashMap<String, ClusterNode<Connection>> {
        std::mem::take(&mut self.completed_addresses)
    }
}

// Follow the refresh ops on the connections
pub(crate) refresh_operations: DashMap<String, RefreshState<Connection>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we talked - instead of using RefreshState, use a ConnectionState that internally holds the operation / connection (same as is reconnectingConnection). For example -

enum ConnectionState<Connection> {
     Connected(ConnectionDetails<Connection>>),
     Reconnecting(JoinHandle<()>),
}

pub struct ClusterNode<Connection> {
    pub user_connection: ConnectionState<Connection>,
    pub management_connection: Option<ConnectionState<Connection>>,
}

or

enum ConnectionState {
     Connected(ClusterNode<Connection>>),
     Reconnecting(handle),
}

connection_map: DashMap<String, ConnectionState<Connection>>,

the refresh_connections can be called only for user/management connection or for both, so you should make sure this solution covers all cases

// Holds all the refreshed addresses that are ready to be inserted into the connection_map
pub(crate) refresh_addresses_done: DashSet<String>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a good reason for using DashSet or DashMap, these structs are only going to be used from a single point without concurrency

}

impl<Connection> Drop for ConnectionsContainer<Connection> {
Expand All @@ -155,6 +169,9 @@ impl<Connection> Default for ConnectionsContainer<Connection> {
slot_map: Default::default(),
read_from_replica_strategy: ReadFromReplicaStrategy::AlwaysFromPrimary,
topology_hash: 0,
refresh_addresses_started: DashSet::new(),
refresh_operations: DashMap::new(),
refresh_addresses_done: DashSet::new(),
}
}
}
Expand Down Expand Up @@ -182,6 +199,9 @@ where
slot_map,
read_from_replica_strategy,
topology_hash,
refresh_addresses_started: DashSet::new(),
refresh_operations: DashMap::new(),
refresh_addresses_done: DashSet::new(),
}
}

Expand Down Expand Up @@ -572,6 +592,9 @@ mod tests {
connection_map,
read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinity("use-1a".to_string()),
topology_hash: 0,
refresh_addresses_started: DashSet::new(),
refresh_operations: DashMap::new(),
refresh_addresses_done: DashSet::new(),
}
}

Expand Down Expand Up @@ -628,6 +651,9 @@ mod tests {
connection_map,
read_from_replica_strategy: strategy,
topology_hash: 0,
refresh_addresses_started: DashSet::new(),
refresh_operations: DashMap::new(),
refresh_addresses_done: DashSet::new(),
}
}

Expand Down
Loading
Loading