Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit breaker #37

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open

Circuit breaker #37

wants to merge 21 commits into from

Conversation

plazmoid
Copy link
Contributor

Resolve DATA-1853

@plazmoid plazmoid force-pushed the DATA-1853_circuit_breaker branch from edee4bc to 4e3f0f6 Compare January 19, 2023 21:42
@plazmoid plazmoid force-pushed the DATA-1853_circuit_breaker branch from 894f336 to e2190db Compare January 20, 2023 08:37
@plazmoid plazmoid force-pushed the DATA-1853_circuit_breaker branch from e2190db to df2138a Compare January 20, 2023 08:42
Copy link
Contributor

@a-kordys a-kordys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also probably a simpler API can be designed here.

wavesexchange_repos/Cargo.toml Outdated Show resolved Hide resolved
wavesexchange_repos/src/lib.rs Outdated Show resolved Hide resolved
wavesexchange_repos/Cargo.toml Outdated Show resolved Hide resolved
wavesexchange_repos/Cargo.toml Outdated Show resolved Hide resolved
wavesexchange_repos/Cargo.toml Outdated Show resolved Hide resolved
wavesexchange_repos/src/circuit_breaker/config.rs Outdated Show resolved Hide resolved

Ok(CircuitBreaker {
state: RwLock::new(CBState {
data_source: Arc::new(init_fn()?),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of putting Arc under Mutex? It is thread-safe by design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see below

Comment on lines 139 to 142
let state_read_lock = self.state.read().await;
let result = query_fn(state_read_lock.data_source.clone()).await;

drop(state_read_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks silly - the thread-safe dasta structure such as Arc was put under mutex, even async mutex, and now we need to lock that mutex just to clone the Arc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CB is designed to interfere client's code as less as possible. RwLock is used here to provide interior state mutability, without it, the end client will suffer while refactoring &self to &mut self.
Arc is used to pass repo to query function. I'm not using & due to lifetime problems (query_fn is async)

wavesexchange_repos/src/circuit_breaker/mod.rs Outdated Show resolved Hide resolved
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::RwLock;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the justification of using async mutex here? It seems that it is unnecessary.
Also, what is the justification to use RwLock? It seems that every attempt to lock this mutex requires write access, so a regular Mutex would do.

Copy link
Contributor Author

@plazmoid plazmoid Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can read-only from rwlock if current and previous queries were successful, so we don't need to obtain write lock and reset state every time.
See upper comm about rwlock's purpose here

/// let cb = CircuitBreaker::builder()
/// .with_max_timespan(Duration::from_secs(1))
/// .with_max_err_count_per_timespan(5)
/// .with_init_fn(|| Ok(Repo));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of this init_fn? Why not to just create that "datasource" once?
What is the real-life example when this overcomplication is really necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deadpool_diesel stops working after failed db query, so its needed to be reinitialized

Fut: Future<Output = Result<T, S::Error>>,
{
let state_read_lock = self.state.read().await;
let result = query_fn(state_read_lock.data_source.clone()).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling (and awaiting) an external async fn while holding lock makes this query() effectively single-threaded which is unacceptable in a real-world application.

Copy link
Contributor Author

@plazmoid plazmoid Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rust docs:
An RwLock will allow any number of readers to acquire the lock as long as a writer is not holding the lock.

So, read is not blocking, and write will block thread only if query fn failed

@plazmoid plazmoid requested a review from a-kordys January 20, 2023 14:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants