Retries
The retries mechanism allows to retry a failing operation according to a given configuration (e.g. retry 3 times with a 100ms delay between attempts).
API
The basic syntax for retries is:
import ox.resilience.retry
retry(config)(operation)
The retry
API uses scheduled
underneath with DSL focused on retries. See scheduled for more details.
Operation definition
The operation
can be provided directly using a by-name parameter, i.e. f: => T
.
There’s also a retryEither
variant which accepts a by-name Either[E, T]
, i.e. f: => Either[E, T]
, as well as one
which accepts arbitrary error modes, accepting the computation in an F
context: f: => F[T]
.
Configuration
A retry config consists of three parts:
a
Schedule
, which indicates how many times and with what delay should we retry theoperation
after an initial failure,a
ResultPolicy
, which indicates whether:a non-erroneous outcome of the
operation
should be considered a success (if not, theoperation
would be retried),an erroneous outcome of the
operation
should be retried or fail fast.
a
onRetry
, which is a callback function that is invoked after each attempt to execute the operation. It is used to perform any necessary actions or checks after each attempt, regardless of whether the attempt was successful or not.
Result policies
A result policy allows to customize how the results of the operation
are treated. It consists of two predicates:
isSuccess: T => Boolean
(default:true
) - determines whether a non-erroneous result of theoperation
should be considered a success. When it evaluates totrue
- no further attempts would be made, otherwise - we’d keep retrying.With finite schedules (i.e. those with
maxRetries
defined), ifisSuccess
keeps returningfalse
whenmaxRetries
are reached, the result is returned as-is, even though it’s considered “unsuccessful”,isWorthRetrying: E => Boolean
(default:true
) - determines whether another attempt would be made if theoperation
results in an errorE
. When it evaluates totrue
- we’d keep retrying, otherwise - we’d fail fast with the error.
The ResultPolicy[E, T]
is generic both over the error (E
) and result (T
) type. Note, however, that for the direct
variant retry
, the error type E
is fixed to Throwable
, while for the Either
and error-mode variants, E
can ba
an arbitrary type.
On retry
The callback function has the following signature:
(Int, Either[E, T]) => Unit
Where:
The first parameter, an
Int
, represents the attempt number of the retry operation.The second parameter is an
Either[E, T]
type, representing the result of the retry operation. Left represents an error and Right represents a successful result.
API shorthands
When you don’t need to customize the result policy (i.e. use the default one) or use complex schedules, you can use one of the following shorthands to define a retry config with a given schedule:
RetryConfig.immediate(maxRetries: Int)
,RetryConfig.immediateForever
,RetryConfig.delay(maxRetries: Int, delay: FiniteDuration)
,RetryConfig.delayForever(delay: FiniteDuration)
,RetryConfig.backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)
,RetryConfig.backoffForever(initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)
.
See scheduled for details on how to create custom schedules.
If you want to customize a part of the result policy, you can use the following shorthands:
ResultPolicy.default[E, T]
- uses the default settings,ResultPolicy.successfulWhen[E, T](isSuccess: T => Boolean)
- uses the defaultisWorthRetrying
and the providedisSuccess
,ResultPolicy.retryWhen[E, T](isWorthRetrying: E => Boolean)
- uses the defaultisSuccess
and the providedisWorthRetrying
,ResultPolicy.neverRetry[E, T]
- uses the defaultisSuccess
and fails fast on any error.
Examples
import ox.UnionMode
import ox.resilience.{retry, retryEither, retryWithErrorMode, ResultPolicy, RetryConfig}
import ox.scheduling.{Jitter, Schedule}
import scala.concurrent.duration.*
def directOperation: Int = ???
def eitherOperation: Either[String, Int] = ???
def unionOperation: String | Int = ???
// various operation definitions - same syntax
retry(RetryConfig.immediate(3))(directOperation)
retryEither(RetryConfig.immediate(3))(eitherOperation)
// various configs with custom schedules and default ResultPolicy
retry(RetryConfig.delay(3, 100.millis))(directOperation)
retry(RetryConfig.backoff(3, 100.millis))(directOperation) // defaults: maxDelay = 1.minute, jitter = Jitter.None
retry(RetryConfig.backoff(3, 100.millis, 5.minutes, Jitter.Equal))(directOperation)
// infinite retries with a default ResultPolicy
retry(RetryConfig.delayForever(100.millis))(directOperation)
retry(RetryConfig.backoffForever(100.millis, 5.minutes, Jitter.Full))(directOperation)
// result policies
// custom success
retry[Int](RetryConfig(Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)))(directOperation)
// fail fast on certain errors
retry(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMessage != "fatal error")))(directOperation)
retryEither(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(eitherOperation)
// custom error mode
retryWithErrorMode(UnionMode[String])(
RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(unionOperation)
See the tests in ox.resilience.*
for more.
Adaptive retries
A retry strategy, backed by a token bucket. Every retry costs a certain amount of tokens from the bucket, and every success causes some tokens to be added back to the bucket. If there are not enought tokens, retry is not attempted.
This way retries don’t overload a system that is down due to a systemic failure (such as a bug in the code, excessive load etc.): retries will be attempted only as long as there are enought tokens in the bucket, then the load on the downstream system will be reduced so that it can recover. In contrast, using a “normal” retry strategy, where every operation is retries up to 3 times, a failure causes the load on the system to increas 4 times.
For transient failures (component failure, infrastructure issues etc.), retries still work “normally”, as the bucket has enough tokens to cover the cost of multiple retries.
Inspiration
Configuration
To use adaptive retries, create an instance of AdaptiveRetry
. These instances are thread-safe and are designed to be shared. Typically, a single instance should be used to proxy access to a single constrained resource.
AdaptiveRetry
is parametrized with:
tokenBucket: Tokenbucket
: instances ofTokenBucket
can be shared across multiple instances ofAdaptiveRetry
failureCost: Int
: number of tokens that are needed for retry in case of failuresuccessReward: Int
: number of tokens that are added back to token bucket after success
RetryConfig
and ResultPolicy
are defined the same as with “normal” retry mechanism, all the configuration from above also applies here.
Instance with default configuration can be obtained with AdaptiveRetry.default
(bucket size = 500, cost for failure = 5 and reward for success = 1).
API
AdaptiveRetry
exposes three variants of retrying, which correspond to the three variants discussed above: retry
, retryEither
and retryWithErrorMode
.
retry
will attempt to retry an operation if it throws an exception; retryEither
will additionally retry, if the result is a Left
. Finally retryWithErrorMode
is the most flexible, and allows retrying operations using custom failure modes (such as union types).
The methods have an additional parameter, shouldPayPenaltyCost
, which determines if result T
should be considered failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation.
Examples
If you want to use this mechanism you need to run operation through instance of AdaptiveRetry
:
import ox.UnionMode
import ox.resilience.AdaptiveRetry
import ox.resilience.{ResultPolicy, RetryConfig}
import ox.scheduling.{Jitter, Schedule}
import scala.concurrent.duration.*
def directOperation: Int = ???
def eitherOperation: Either[String, Int] = ???
def unionOperation: String | Int = ???
val adaptive = AdaptiveRetry.default
// various configs with custom schedules and default ResultPolicy
adaptive.retry(RetryConfig.immediate(3))(directOperation)
adaptive.retry(RetryConfig.delay(3, 100.millis))(directOperation)
adaptive.retry(RetryConfig.backoff(3, 100.millis))(directOperation) // defaults: maxDelay = 1.minute, jitter = Jitter.None
adaptive.retry(RetryConfig.backoff(3, 100.millis, 5.minutes, Jitter.Equal))(directOperation)
// result policies
// custom success
adaptive.retry[Int](
RetryConfig(Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)))(directOperation)
// fail fast on certain errors
adaptive.retry(
RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMessage != "fatal error")))(directOperation)
adaptive.retryEither(
RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(eitherOperation)
// custom error mode
adaptive.retryWithErrorMode(UnionMode[String])(
RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(unionOperation)
// consider "throttling error" not as a failure that should incur the retry penalty
adaptive.retryWithErrorMode(UnionMode[String])(
RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")),
shouldPayFailureCost = _.fold(_ != "throttling error", _ => true))(unionOperation)