-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[redis-rs][core] Move connection refresh to the background #2915
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
31cf02f
to
f850d99
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a substantial change in the core mechanics. It must include tests that fail w/o this change to prove it necessity
instead of splitting your state between multiple maps and leaving room for error in reading the wrong map, consider just use an enum representing each connection state (Usable, Reconnecting, etc.) in the main connection map. See
|
Signed-off-by: GilboaAWS <[email protected]>
ffbd018
to
30b35f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First comments (stopped before fn update_refreshed_connection).
Please see all inline comments and see how it behaves with cases where refresh connections is being called only with management/user/both. Also, maybe I haven't got to it yet - but the refresh task should be bounded to the lifetime of the clusterNode / address, so it would be cancelled when refresh_slots removes it from the topology
// Holds all the failed addresses that started a refresh task. | ||
pub(crate) refresh_addresses_started: DashSet<String>, | ||
// Follow the refresh ops on the connections | ||
pub(crate) refresh_operations: DashMap<String, RefreshState<Connection>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we talked - instead of using RefreshState, use a ConnectionState that internally holds the operation / connection (same as is reconnectingConnection). For example -
enum ConnectionState<Connection> {
Connected(ConnectionDetails<Connection>>),
Reconnecting(JoinHandle<()>),
}
pub struct ClusterNode<Connection> {
pub user_connection: ConnectionState<Connection>,
pub management_connection: Option<ConnectionState<Connection>>,
}
or
enum ConnectionState {
Connected(ClusterNode<Connection>>),
Reconnecting(handle),
}
connection_map: DashMap<String, ConnectionState<Connection>>,
the refresh_connections can be called only for user/management connection or for both, so you should make sure this solution covers all cases
// Follow the refresh ops on the connections | ||
pub(crate) refresh_operations: DashMap<String, RefreshState<Connection>>, | ||
// Holds all the refreshed addresses that are ready to be inserted into the connection_map | ||
pub(crate) refresh_addresses_done: DashSet<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a good reason for using DashSet or DashMap, these structs are only going to be used from a single point without concurrency
@@ -139,6 +146,13 @@ pub(crate) struct ConnectionsContainer<Connection> { | |||
pub(crate) slot_map: SlotMap, | |||
read_from_replica_strategy: ReadFromReplicaStrategy, | |||
topology_hash: TopologyHash, | |||
|
|||
// Holds all the failed addresses that started a refresh task. | |||
pub(crate) refresh_addresses_started: DashSet<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- You can use the completed addresses to hold the newly created cluster node.
- Instead of throwing all refresh structs directly into the connection container we might be better by wrapping all updates with some struct, so later on we'll be able to add refresh slots or other updates there too.
For example:
struct RefreshUpdates {
in_progress_addresses: HashSet<String>,
completed_addresses: HashMap<String, ClusterNode<Connection>>,
// refreshed_slot_map: Option<SlotMap>,
}
impl RefreshUpdates {
pub(crate) fn in_progress(&self) -> &HashSet<String> {
&self.in_progress_addresses
}
pub(crate) fn add_refresh(&mut self, address: String) {
self.in_progress_addresses.insert(address);
}
pub(crate) fn complete_refresh(&mut self, address: String, node: ClusterNode<Connection>) {
if !self.in_progress_addresses.remove(&address) {
warn!("Attempted to complete refresh for an address not in progress: {}", address);
}
self.completed_addresses.insert(address, node);
}
pub(crate) fn completed_addresses(&mut self) -> HashMap<String, ClusterNode<Connection>> {
std::mem::take(&mut self.completed_addresses)
}
}
None | ||
}; | ||
for address in addresses { | ||
if refresh_ops_map.contains_key(&address) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets have an API in the connection_container for "is_reconnecting(address)".
And you can use filter and for each:
addresses
.into_iter()
.filter(|address| !connections_container.is_reconnecting(address))
.for_each(|address| {
let handle = tokio::spawn(async move {
info!("Refreshing connection task to {:?} started", address);
...
});
});
"Refreshing connection task to {:?} started", | ||
address_clone_for_task | ||
); | ||
let _ = async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you wrap it with async block again?
} | ||
debug!("refresh connections completed"); | ||
debug!("refresh connection tasts initiated"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug!("refresh connection tasts initiated"); | |
debug!("refresh connection tasks initiated"); |
@@ -1798,9 +1858,8 @@ where | |||
if !failed_connections.is_empty() { | |||
Self::refresh_connections( | |||
inner, | |||
failed_connections, | |||
failed_connections.into_iter().collect::<HashSet<String>>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it creates an unnecessary copy. instead you can change failed_connections to be a set to begin with
Signed-off-by: GilboaAWS <[email protected]>
01a0dbd
to
b2caf01
Compare
Signed-off-by: GilboaAWS <[email protected]>
40744cb
to
4e6535f
Compare
Signed-off-by: GilboaAWS <[email protected]>
This reverts commit 4e6535f. Signed-off-by: GilboaAWS <[email protected]>
…ils because all connection are dropped and the refresh task take longer than the request timeout. Signed-off-by: GilboaAWS <[email protected]>
…eturned the refresh_connection logic of sending the connection but without removing it from the connection_map Signed-off-by: GilboaAWS <[email protected]>
Issue link
This Pull Request is linked to issue (URL): [#2910]
Checklist
Before submitting the PR make sure the following are checked: