Skip to content

Commit

Permalink
Merge pull request #2750 from subspace/nats-client-improvements
Browse files Browse the repository at this point in the history
Support for request retries, better message size API, queue group support in subscriptions of NATS client
  • Loading branch information
nazar-pc authored May 9, 2024
2 parents e4afea4 + bc51375 commit 3d749ab
Showing 1 changed file with 134 additions and 27 deletions.
161 changes: 134 additions & 27 deletions crates/subspace-farmer/src/cluster/nats_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@
//! * broadcasts and corresponding subscriptions (for example slot info broadcast)
use async_nats::{
Client, HeaderMap, HeaderValue, PublishError, RequestError, RequestErrorKind, Subject,
SubscribeError, Subscriber, ToServerAddrs,
Client, ConnectOptions, HeaderMap, HeaderValue, PublishError, RequestError, RequestErrorKind,
Subject, SubscribeError, Subscriber, ToServerAddrs,
};
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use derive_more::{Deref, DerefMut};
use futures::{Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use std::any::type_name;
use std::collections::VecDeque;
use std::fmt;
use std::marker::PhantomData;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use thiserror::Error;
use tracing::warn;
use tracing::{debug, warn};
use ulid::Ulid;

// TODO: Replace this with querying of the actual value from the server
/// Approximate max message size (a few more bytes will not hurt), the actual limit is expected to
/// be 2M in NATS
pub const APPROXIMATE_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024 * 8 / 10;
const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024;

/// Generic request with associated response.
///
Expand All @@ -52,6 +53,7 @@ pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
/// Used for cases where a large payload that doesn't fit into NATS message needs to be sent or
/// there is a very large number of messages to send. For simple request/response patten
/// [`GenericRequest`] can be used instead.
// TODO: Sequence numbers for streams, backpressure with acknowledgement
pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
/// Request subject with optional `*` in place of application instance to receive the request
const SUBJECT: &'static str;
Expand Down Expand Up @@ -250,26 +252,75 @@ where
}
}

#[derive(Debug)]
struct Inner {
client: Client,
request_retry_backoff_policy: ExponentialBackoff,
approximate_max_message_size: usize,
}

/// NATS client wrapper that can be used to interact with other Subspace-specific clients
#[derive(Debug, Clone, Deref)]
#[derive(Debug, Clone)]
pub struct NatsClient {
client: Client,
inner: Arc<Inner>,
}

impl From<Client> for NatsClient {
fn from(client: Client) -> Self {
Self { client }
impl Deref for NatsClient {
type Target = Client;

fn deref(&self) -> &Self::Target {
&self.inner.client
}
}

impl NatsClient {
/// Create new instance by connecting to specified addresses
pub async fn new<A: ToServerAddrs>(addrs: A) -> Result<Self, async_nats::Error> {
pub async fn new<A: ToServerAddrs>(
addrs: A,
request_retry_backoff_policy: ExponentialBackoff,
) -> Result<Self, async_nats::Error> {
Self::from_client(
async_nats::connect_with_options(
addrs,
ConnectOptions::default().request_timeout(None),
)
.await?,
request_retry_backoff_policy,
)
}

/// Create new client from existing NATS instance
pub fn from_client(
client: Client,
request_retry_backoff_policy: ExponentialBackoff,
) -> Result<Self, async_nats::Error> {
let max_payload = client.server_info().max_payload;
if max_payload < EXPECTED_MESSAGE_SIZE {
return Err(format!(
"Max payload {max_payload} is smaller than expected {EXPECTED_MESSAGE_SIZE}, \
increase it by specifying max_payload = 2MB or higher number in NATS configuration"
)
.into());
}

let inner = Inner {
client,
request_retry_backoff_policy,
// Allow up to 90%, the rest will be wrapper data structures, etc.
approximate_max_message_size: max_payload * 9 / 10,
};

Ok(Self {
client: async_nats::connect(addrs).await?,
inner: Arc::new(inner),
})
}

/// Approximate max message size (a few more bytes will not hurt), the actual limit is expected
/// to be a bit higher
pub fn approximate_max_message_size(&self) -> usize {
self.inner.approximate_max_message_size
}

/// Make request and wait for response
pub async fn request<Request>(
&self,
Expand All @@ -280,10 +331,50 @@ impl NatsClient {
Request: GenericRequest,
{
let subject = subject_with_instance(Request::SUBJECT, instance);
let message = self
.client
.request(subject.clone(), request.encode().into())
.await?;
let mut maybe_retry_backoff = None;
let message = loop {
match self
.inner
.client
.request(subject.clone(), request.encode().into())
.await
{
Ok(message) => {
break message;
}
Err(error) => {
match error.kind() {
RequestErrorKind::TimedOut | RequestErrorKind::NoResponders => {
// Continue with retries
}
RequestErrorKind::Other => {
return Err(error);
}
}

let retry_backoff = maybe_retry_backoff.get_or_insert_with(|| {
let mut retry_backoff = self.inner.request_retry_backoff_policy.clone();
retry_backoff.reset();
retry_backoff
});

if let Some(delay) = retry_backoff.next_backoff() {
debug!(
%subject,
%error,
request_type = %type_name::<Request>(),
?delay,
"Failed to make request, retrying after some delay"
);

tokio::time::sleep(delay).await;
continue;
} else {
return Err(error);
}
}
}
};

let response =
Request::Response::decode(&mut message.payload.as_ref()).map_err(|error| {
Expand Down Expand Up @@ -313,11 +404,13 @@ impl NatsClient {
let stream_request = StreamRequest::new(request);

let subscriber = self
.inner
.client
.subscribe(stream_request.response_subject.clone())
.await?;

self.client
self.inner
.client
.publish(
subject_with_instance(Request::SUBJECT, instance),
stream_request.encode().into(),
Expand All @@ -340,7 +433,8 @@ impl NatsClient {
where
Notification: GenericNotification,
{
self.client
self.inner
.client
.publish(
subject_with_instance(Notification::SUBJECT, instance),
notification.encode().into(),
Expand All @@ -357,7 +451,8 @@ impl NatsClient {
where
Broadcast: GenericBroadcast,
{
self.client
self.inner
.client
.publish_with_headers(
Broadcast::SUBJECT.replace('*', instance),
{
Expand All @@ -377,23 +472,27 @@ impl NatsClient {
pub async fn subscribe_to_notifications<Notification>(
&self,
instance: Option<&str>,
queue_group: Option<String>,
) -> Result<SubscriberWrapper<Notification>, SubscribeError>
where
Notification: GenericNotification,
{
self.simple_subscribe(Notification::SUBJECT, instance).await
self.simple_subscribe(Notification::SUBJECT, instance, queue_group)
.await
}

/// Simple subscription that will produce decoded broadcasts, while skipping messages that
/// fail to decode
pub async fn subscribe_to_broadcasts<Broadcast>(
&self,
instance: Option<&str>,
queue_group: Option<String>,
) -> Result<SubscriberWrapper<Broadcast>, SubscribeError>
where
Broadcast: GenericBroadcast,
{
self.simple_subscribe(Broadcast::SUBJECT, instance).await
self.simple_subscribe(Broadcast::SUBJECT, instance, queue_group)
.await
}

/// Simple subscription that will produce decoded messages, while skipping messages that fail to
Expand All @@ -402,15 +501,23 @@ impl NatsClient {
&self,
subject: &'static str,
instance: Option<&str>,
queue_group: Option<String>,
) -> Result<SubscriberWrapper<Message>, SubscribeError>
where
Message: Decode,
{
Ok(SubscriberWrapper {
subscriber: self
.client
.subscribe(subject_with_instance(subject, instance))
.await?,
subscriber: if let Some(queue_group) = queue_group {
self.inner
.client
.queue_subscribe(subject_with_instance(subject, instance), queue_group)
.await?
} else {
self.inner
.client
.subscribe(subject_with_instance(subject, instance))
.await?
},
_phantom: PhantomData,
})
}
Expand Down

0 comments on commit 3d749ab

Please sign in to comment.