Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for request retries, better message size API, queue group support in subscriptions of NATS client #2750

Merged
merged 1 commit into from
May 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 134 additions & 27 deletions crates/subspace-farmer/src/cluster/nats_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@
//! * broadcasts and corresponding subscriptions (for example slot info broadcast)

use async_nats::{
Client, HeaderMap, HeaderValue, PublishError, RequestError, RequestErrorKind, Subject,
SubscribeError, Subscriber, ToServerAddrs,
Client, ConnectOptions, HeaderMap, HeaderValue, PublishError, RequestError, RequestErrorKind,
Subject, SubscribeError, Subscriber, ToServerAddrs,
};
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use derive_more::{Deref, DerefMut};
use futures::{Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use std::any::type_name;
use std::collections::VecDeque;
use std::fmt;
use std::marker::PhantomData;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use thiserror::Error;
use tracing::warn;
use tracing::{debug, warn};
use ulid::Ulid;

// TODO: Replace this with querying of the actual value from the server
/// Approximate max message size (a few more bytes will not hurt), the actual limit is expected to
/// be 2M in NATS
pub const APPROXIMATE_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024 * 8 / 10;
const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024;

/// Generic request with associated response.
///
Expand All @@ -52,6 +53,7 @@ pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
/// Used for cases where a large payload that doesn't fit into NATS message needs to be sent or
/// there is a very large number of messages to send. For simple request/response patten
/// [`GenericRequest`] can be used instead.
// TODO: Sequence numbers for streams, backpressure with acknowledgement
pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
/// Request subject with optional `*` in place of application instance to receive the request
const SUBJECT: &'static str;
Expand Down Expand Up @@ -250,26 +252,75 @@ where
}
}

#[derive(Debug)]
struct Inner {
client: Client,
request_retry_backoff_policy: ExponentialBackoff,
approximate_max_message_size: usize,
}

/// NATS client wrapper that can be used to interact with other Subspace-specific clients
#[derive(Debug, Clone, Deref)]
#[derive(Debug, Clone)]
pub struct NatsClient {
client: Client,
inner: Arc<Inner>,
}

impl From<Client> for NatsClient {
fn from(client: Client) -> Self {
Self { client }
impl Deref for NatsClient {
type Target = Client;

fn deref(&self) -> &Self::Target {
&self.inner.client
}
}

impl NatsClient {
/// Create new instance by connecting to specified addresses
pub async fn new<A: ToServerAddrs>(addrs: A) -> Result<Self, async_nats::Error> {
pub async fn new<A: ToServerAddrs>(
addrs: A,
request_retry_backoff_policy: ExponentialBackoff,
) -> Result<Self, async_nats::Error> {
Self::from_client(
async_nats::connect_with_options(
addrs,
ConnectOptions::default().request_timeout(None),
)
.await?,
request_retry_backoff_policy,
)
}

/// Create new client from existing NATS instance
pub fn from_client(
client: Client,
request_retry_backoff_policy: ExponentialBackoff,
) -> Result<Self, async_nats::Error> {
let max_payload = client.server_info().max_payload;
if max_payload < EXPECTED_MESSAGE_SIZE {
return Err(format!(
"Max payload {max_payload} is smaller than expected {EXPECTED_MESSAGE_SIZE}, \
increase it by specifying max_payload = 2MB or higher number in NATS configuration"
)
.into());
}

let inner = Inner {
client,
request_retry_backoff_policy,
// Allow up to 90%, the rest will be wrapper data structures, etc.
approximate_max_message_size: max_payload * 9 / 10,
};

Ok(Self {
client: async_nats::connect(addrs).await?,
inner: Arc::new(inner),
})
}

/// Approximate max message size (a few more bytes will not hurt), the actual limit is expected
/// to be a bit higher
pub fn approximate_max_message_size(&self) -> usize {
self.inner.approximate_max_message_size
}

/// Make request and wait for response
pub async fn request<Request>(
&self,
Expand All @@ -280,10 +331,50 @@ impl NatsClient {
Request: GenericRequest,
{
let subject = subject_with_instance(Request::SUBJECT, instance);
let message = self
.client
.request(subject.clone(), request.encode().into())
.await?;
let mut maybe_retry_backoff = None;
let message = loop {
match self
.inner
.client
.request(subject.clone(), request.encode().into())
.await
{
Ok(message) => {
break message;
}
Err(error) => {
match error.kind() {
RequestErrorKind::TimedOut | RequestErrorKind::NoResponders => {
// Continue with retries
}
RequestErrorKind::Other => {
return Err(error);
}
}

let retry_backoff = maybe_retry_backoff.get_or_insert_with(|| {
let mut retry_backoff = self.inner.request_retry_backoff_policy.clone();
retry_backoff.reset();
retry_backoff
});

if let Some(delay) = retry_backoff.next_backoff() {
debug!(
%subject,
%error,
request_type = %type_name::<Request>(),
?delay,
"Failed to make request, retrying after some delay"
);

tokio::time::sleep(delay).await;
continue;
} else {
return Err(error);
}
}
}
};

let response =
Request::Response::decode(&mut message.payload.as_ref()).map_err(|error| {
Expand Down Expand Up @@ -313,11 +404,13 @@ impl NatsClient {
let stream_request = StreamRequest::new(request);

let subscriber = self
.inner
.client
.subscribe(stream_request.response_subject.clone())
.await?;

self.client
self.inner
.client
.publish(
subject_with_instance(Request::SUBJECT, instance),
stream_request.encode().into(),
Expand All @@ -340,7 +433,8 @@ impl NatsClient {
where
Notification: GenericNotification,
{
self.client
self.inner
.client
.publish(
subject_with_instance(Notification::SUBJECT, instance),
notification.encode().into(),
Expand All @@ -357,7 +451,8 @@ impl NatsClient {
where
Broadcast: GenericBroadcast,
{
self.client
self.inner
.client
.publish_with_headers(
Broadcast::SUBJECT.replace('*', instance),
{
Expand All @@ -377,23 +472,27 @@ impl NatsClient {
pub async fn subscribe_to_notifications<Notification>(
&self,
instance: Option<&str>,
queue_group: Option<String>,
) -> Result<SubscriberWrapper<Notification>, SubscribeError>
where
Notification: GenericNotification,
{
self.simple_subscribe(Notification::SUBJECT, instance).await
self.simple_subscribe(Notification::SUBJECT, instance, queue_group)
.await
}

/// Simple subscription that will produce decoded broadcasts, while skipping messages that
/// fail to decode
pub async fn subscribe_to_broadcasts<Broadcast>(
&self,
instance: Option<&str>,
queue_group: Option<String>,
) -> Result<SubscriberWrapper<Broadcast>, SubscribeError>
where
Broadcast: GenericBroadcast,
{
self.simple_subscribe(Broadcast::SUBJECT, instance).await
self.simple_subscribe(Broadcast::SUBJECT, instance, queue_group)
.await
}

/// Simple subscription that will produce decoded messages, while skipping messages that fail to
Expand All @@ -402,15 +501,23 @@ impl NatsClient {
&self,
subject: &'static str,
instance: Option<&str>,
queue_group: Option<String>,
) -> Result<SubscriberWrapper<Message>, SubscribeError>
where
Message: Decode,
{
Ok(SubscriberWrapper {
subscriber: self
.client
.subscribe(subject_with_instance(subject, instance))
.await?,
subscriber: if let Some(queue_group) = queue_group {
self.inner
.client
.queue_subscribe(subject_with_instance(subject, instance), queue_group)
.await?
} else {
self.inner
.client
.subscribe(subject_with_instance(subject, instance))
.await?
},
_phantom: PhantomData,
})
}
Expand Down
Loading