-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Circuit breaker #266
base: master
Are you sure you want to change the base?
Circuit breaker #266
Conversation
For now this is incomplete since I have some questions.
Right now only now only count based sliding window is implemented but I wanted to discuss those questions right away. With time based sliding windows I think that background loop evicting older entries from queue similar to SlidingWindow RateLimiterAlgorithm should be sufficient. |
if numOfOperations >= minimumNumberOfCalls && (failuresRate >= failureRateThreshold || slowRate >= slowCallThreshold) then | ||
// Start schedule to switch to HalfOpen after waitDurationOpenState passed | ||
forkDiscard( | ||
scheduled(ScheduledConfig[Throwable, Unit](Schedule.InitialDelay(waitDurationOpenState)))( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Effects in updateAndGet should be avoided, but I think scheduling state change twice (in case of reapplying function) should not break things, especially since we should have braker open for some time. But maybe it can create race condition where we can complete enough calls in HalfOpen state to change it to Closed only for second scheduled to complete and change it back to HalfOpen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm this looks fragile. I think we should change the design a little. An actor here seems like a natural choice - the actor state - that is, the state machine - would then always be accessed from a single thread. We could send updates to the actor with the results of operation invocations, and basing on the, the internal state would be updated.
One place where we'd have to deviate from the actor pattern is checking if the circuit breaker is open - always going through the actor would be a bottleneck (as all operations would have to synchronize on a single thread). So the actor would also have to update some shared mutable state, which every thread could quickly check (not necessarily immediately consistent, it's fine if we let through one or two additional operations, when the CB is closing)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then each circuit breaker state could be modelled as an immutable value, so we'd only have one top-level mutable overall state, managed by the actor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In last commits I changed to this approach but in second test it seems like on my machine circuitBreaker allow to run 26-27 operations while it should open after 10. If I change buffer capacity of actor to 100 it is able to complete all operations before it opens. When f takes 10ms only one operations slips through. I think with actor I can get rid of AtomicCircularBuffer, maybe this creates enough overhead that "fast" operations are able to start faster than state changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also would need actorRef to schedule state update within state machine but I can't seem to figure out how to pass it since they depend on each other
As for the questions:
|
Answer based on what is available in pekko/akka, monix, rezilience (zio). circuit (cats-effect) and polly(C#)
Monix, circuit and Pekko/Akka works exactly the same. They count failures (or slow calls) in a row not a rate based on window. Then wait before going to halfOpen and then deciding based on one operation result. Plus the wait duration before transitioning to halfOpen is configured as backoff. rezilience provides maxFailures in a row just like monix and also count based sliding window. It also supports different schedules for waiting before going to halfOpen state. It also allows only one call to decide if it goes back to open or close. Polly is little different but only in few cases. It provides threshold rates for sampling window of some duration. As I understand it in effect means sliding window(But maybe it is simpler and works just like fixed window). It also support minimum number of calls to be able trip in a sample. It also allows for dynamically determining break duration before switching to halfOpen.It also provide ability to set state manually and reading current state through CircuitBreakerStateProvider. Zio, resilience4j and rezilience also provides ability to consume different events like state changes or metrics. |
case Slow | ||
|
||
case class Metrics( | ||
failureRate: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be good to name it or at least comment to be more precise on what "rate" is - I assume calls per second? or another unit? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's percent of failed call in last window. I wanted to move state logic to parent trait and only leave calculating metrics to different implementations
) | ||
|
||
enum SlidingWindow: | ||
case CountBased(windowSize: Int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these Int
s are quite opaque ... maybe we could use type aliases or opaque types to make them more meaningful?
end if | ||
end runOrDrop | ||
|
||
def runEitherOrDrop[E, T](resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T])( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this just delegate to runOrDrop
using the either error mode?
Also we need the "basic" case of runOrDrop(op: => T): T
where failures are exceptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really know why I didn't do that, will fix.
if acquiredResult.acquired then | ||
val before = System.nanoTime() | ||
val result = op | ||
val after = System.nanoTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have a timed
utility method in ox, I think we have such time measurements in a couple of places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly I don't think we have. We have couple of similar methods as test utilities.
val after = System.nanoTime() | ||
val duration = (after - before).nanos | ||
// Check result and results of policy | ||
if em.isError(result) && resultPolicy.isWorthRetrying(em.getError(result)) then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if resultPolicy
applies here. If it's an error, it's an error. Does it matter if the error is retryable when it comes to CB?
private val callResults: Array[Option[CircuitBreakerResult]] = Array.fill[Option[CircuitBreakerResult]](windowSize)(None) | ||
private var writeIndex = 0 | ||
|
||
private var _state: CircuitBreakerState = CircuitBreakerState.Closed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is accessed externally, so needs to be concurrency-protected? with a large disclaimer that although this does break the actor's internal state protection, we know what we're doing ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if it needs to be atomic. We read concurrently but write only from the same actor's thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concurrent reads need to be protected as well. If you don't know why, it might be good to read on memory barriers, java's memory model and concurrency primitives :)
def state: CircuitBreakerState = _state | ||
|
||
def registerResult(result: CircuitBreakerResult, acquired: AcquireResult): Unit = | ||
callResults(writeIndex) = Some(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in fact, it might be good to separate the "pure" state machine update function - which takes the current state + call result as a parameter, and outputs the new state + optional self-callbacks to run (?); from the mutable actor's state, which can be queried externally.
import scala.concurrent.duration.* | ||
import ox.* | ||
|
||
class CircuitBreakerTest extends AnyFlatSpec with Matchers: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably too late to do proper TDD, but it might be a good idea to invest some time into a strong testsuite, where all (or most) test fail, but which test the various configuration options that we want to support (thresholds, opening/closing, count-based and time-based windows etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to test separately state machine, so now that the interface more or less clarified it should be easier to write those tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, this might be a good idea - but I would then double-down on separating the "pure" and side-effecting/mutating parts. And write a lot of tests for the pure part, and some "integration" ones for the whole package
Good analysis, thanks :) So basing on that, what design would you propose? What would be the configuration options, and the algorithm of transferring between closed/ho/open states? Not sure if we need both count-based and windowed variants - isn't the count-based variant a windowed variant, but with window duration = Inf? |
There is difference that if we would just treat count based as sliding window with Inf we would always have to count all results. Window size defines how many n last operation we want to include in metrics. I wanted to move all state machine logic to base trait and only difference between implementations would be how we calculate metrics. If we leave both variants we can have all functionalities (maybe apart from ability to consume events). Giving proper arguments we can mimic pekko and monix behavior exactly. I am only debating if we would want to support any Infinite schedule when it comes to those durations, but I would want to have proper implementation of all other functionalities before that, then see if it fits. |
But in a count-based approach, you're counting all results anyway? |
Yeah, but callResults is a very basic implementation of CircularBuffer so we count only on max n call results and don't hold in memory more results than we need. The writeIndex is increment during registration of result. |
This draft implements CricuitBreaker with features based on that are provided in breaker from resilience4j.
Those features, for count based window (last n operations):
Things that are not implemented in here but are in resilience4j.