From bc51375625b197a073930acff5a5df2e86b70a49 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 8 May 2024 21:21:35 +0300 Subject: [PATCH] Support for request retries, better message size API, queue group support in subscriptions of NATS client --- .../src/cluster/nats_client.rs | 161 +++++++++++++++--- 1 file changed, 134 insertions(+), 27 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index 60557cc13c..b73099a0aa 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -14,9 +14,11 @@ //! * broadcasts and corresponding subscriptions (for example slot info broadcast) use async_nats::{ - Client, HeaderMap, HeaderValue, PublishError, RequestError, RequestErrorKind, Subject, - SubscribeError, Subscriber, ToServerAddrs, + Client, ConnectOptions, HeaderMap, HeaderValue, PublishError, RequestError, RequestErrorKind, + Subject, SubscribeError, Subscriber, ToServerAddrs, }; +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; use derive_more::{Deref, DerefMut}; use futures::{Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; @@ -24,16 +26,15 @@ use std::any::type_name; use std::collections::VecDeque; use std::fmt; use std::marker::PhantomData; +use std::ops::Deref; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use thiserror::Error; -use tracing::warn; +use tracing::{debug, warn}; use ulid::Ulid; -// TODO: Replace this with querying of the actual value from the server -/// Approximate max message size (a few more bytes will not hurt), the actual limit is expected to -/// be 2M in NATS -pub const APPROXIMATE_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024 * 8 / 10; +const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024; /// Generic request with associated response. /// @@ -52,6 +53,7 @@ pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static { /// Used for cases where a large payload that doesn't fit into NATS message needs to be sent or /// there is a very large number of messages to send. For simple request/response patten /// [`GenericRequest`] can be used instead. +// TODO: Sequence numbers for streams, backpressure with acknowledgement pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static { /// Request subject with optional `*` in place of application instance to receive the request const SUBJECT: &'static str; @@ -250,26 +252,75 @@ where } } +#[derive(Debug)] +struct Inner { + client: Client, + request_retry_backoff_policy: ExponentialBackoff, + approximate_max_message_size: usize, +} + /// NATS client wrapper that can be used to interact with other Subspace-specific clients -#[derive(Debug, Clone, Deref)] +#[derive(Debug, Clone)] pub struct NatsClient { - client: Client, + inner: Arc, } -impl From for NatsClient { - fn from(client: Client) -> Self { - Self { client } +impl Deref for NatsClient { + type Target = Client; + + fn deref(&self) -> &Self::Target { + &self.inner.client } } impl NatsClient { /// Create new instance by connecting to specified addresses - pub async fn new(addrs: A) -> Result { + pub async fn new( + addrs: A, + request_retry_backoff_policy: ExponentialBackoff, + ) -> Result { + Self::from_client( + async_nats::connect_with_options( + addrs, + ConnectOptions::default().request_timeout(None), + ) + .await?, + request_retry_backoff_policy, + ) + } + + /// Create new client from existing NATS instance + pub fn from_client( + client: Client, + request_retry_backoff_policy: ExponentialBackoff, + ) -> Result { + let max_payload = client.server_info().max_payload; + if max_payload < EXPECTED_MESSAGE_SIZE { + return Err(format!( + "Max payload {max_payload} is smaller than expected {EXPECTED_MESSAGE_SIZE}, \ + increase it by specifying max_payload = 2MB or higher number in NATS configuration" + ) + .into()); + } + + let inner = Inner { + client, + request_retry_backoff_policy, + // Allow up to 90%, the rest will be wrapper data structures, etc. + approximate_max_message_size: max_payload * 9 / 10, + }; + Ok(Self { - client: async_nats::connect(addrs).await?, + inner: Arc::new(inner), }) } + /// Approximate max message size (a few more bytes will not hurt), the actual limit is expected + /// to be a bit higher + pub fn approximate_max_message_size(&self) -> usize { + self.inner.approximate_max_message_size + } + /// Make request and wait for response pub async fn request( &self, @@ -280,10 +331,50 @@ impl NatsClient { Request: GenericRequest, { let subject = subject_with_instance(Request::SUBJECT, instance); - let message = self - .client - .request(subject.clone(), request.encode().into()) - .await?; + let mut maybe_retry_backoff = None; + let message = loop { + match self + .inner + .client + .request(subject.clone(), request.encode().into()) + .await + { + Ok(message) => { + break message; + } + Err(error) => { + match error.kind() { + RequestErrorKind::TimedOut | RequestErrorKind::NoResponders => { + // Continue with retries + } + RequestErrorKind::Other => { + return Err(error); + } + } + + let retry_backoff = maybe_retry_backoff.get_or_insert_with(|| { + let mut retry_backoff = self.inner.request_retry_backoff_policy.clone(); + retry_backoff.reset(); + retry_backoff + }); + + if let Some(delay) = retry_backoff.next_backoff() { + debug!( + %subject, + %error, + request_type = %type_name::(), + ?delay, + "Failed to make request, retrying after some delay" + ); + + tokio::time::sleep(delay).await; + continue; + } else { + return Err(error); + } + } + } + }; let response = Request::Response::decode(&mut message.payload.as_ref()).map_err(|error| { @@ -313,11 +404,13 @@ impl NatsClient { let stream_request = StreamRequest::new(request); let subscriber = self + .inner .client .subscribe(stream_request.response_subject.clone()) .await?; - self.client + self.inner + .client .publish( subject_with_instance(Request::SUBJECT, instance), stream_request.encode().into(), @@ -340,7 +433,8 @@ impl NatsClient { where Notification: GenericNotification, { - self.client + self.inner + .client .publish( subject_with_instance(Notification::SUBJECT, instance), notification.encode().into(), @@ -357,7 +451,8 @@ impl NatsClient { where Broadcast: GenericBroadcast, { - self.client + self.inner + .client .publish_with_headers( Broadcast::SUBJECT.replace('*', instance), { @@ -377,11 +472,13 @@ impl NatsClient { pub async fn subscribe_to_notifications( &self, instance: Option<&str>, + queue_group: Option, ) -> Result, SubscribeError> where Notification: GenericNotification, { - self.simple_subscribe(Notification::SUBJECT, instance).await + self.simple_subscribe(Notification::SUBJECT, instance, queue_group) + .await } /// Simple subscription that will produce decoded broadcasts, while skipping messages that @@ -389,11 +486,13 @@ impl NatsClient { pub async fn subscribe_to_broadcasts( &self, instance: Option<&str>, + queue_group: Option, ) -> Result, SubscribeError> where Broadcast: GenericBroadcast, { - self.simple_subscribe(Broadcast::SUBJECT, instance).await + self.simple_subscribe(Broadcast::SUBJECT, instance, queue_group) + .await } /// Simple subscription that will produce decoded messages, while skipping messages that fail to @@ -402,15 +501,23 @@ impl NatsClient { &self, subject: &'static str, instance: Option<&str>, + queue_group: Option, ) -> Result, SubscribeError> where Message: Decode, { Ok(SubscriberWrapper { - subscriber: self - .client - .subscribe(subject_with_instance(subject, instance)) - .await?, + subscriber: if let Some(queue_group) = queue_group { + self.inner + .client + .queue_subscribe(subject_with_instance(subject, instance), queue_group) + .await? + } else { + self.inner + .client + .subscribe(subject_with_instance(subject, instance)) + .await? + }, _phantom: PhantomData, }) }