# Ox Safe direct-style streaming, concurrency and resiliency for Scala on the JVM. Requires JDK 21+ & Scala 3. To start using Ox, add the `com.softwaremill.ox::core:1.0.5` [dependency](info/dependency.md) to your project. Then, take a look at the tour of Ox, or follow one of the topics listed in the menu to get to know Ox's API! In addition to this documentation, ScalaDocs can be browsed at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.ox). ```{include} tour.md ``` ## Table of contents ```{eval-rst} .. toctree:: :maxdepth: 2 :caption: Project info info/community-support info/dependency info/scope info/ai .. toctree:: :maxdepth: 2 :caption: Basics tour basics/direct-style basics/error-handling .. toctree:: :maxdepth: 2 :caption: High-level concurrency high-level-concurrency/par high-level-concurrency/race high-level-concurrency/collections high-level-concurrency/timeout .. toctree:: :maxdepth: 2 :caption: Structured concurrency structured-concurrency/index structured-concurrency/fork-join structured-concurrency/error-handling-scopes structured-concurrency/fork-local structured-concurrency/interruptions .. toctree:: :maxdepth: 2 :caption: Streaming streaming/index streaming/flows streaming/io streaming/channels streaming/transforming-channels streaming/selecting-from-channels streaming/errors streaming/backpressure .. toctree:: :maxdepth: 2 :caption: Scheduling scheduling/retries scheduling/repeat scheduling/scheduled .. toctree:: :maxdepth: 2 :caption: Resiliency & utilities utils/oxapp utils/rate-limiter utils/resources utils/control-flow utils/actors utils/circuit-breaker utils/utility .. toctree:: :maxdepth: 2 :caption: Integrations integrations/kafka integrations/mdc-logback integrations/cron4s integrations/otel-context integrations/tapir integrations/sttp-client .. toctree:: :maxdepth: 2 :caption: Other topics other/stability other/links other/dictionary other/best-practices other/performance other/compare-gears other/compare-funeff # Community & support ## Community If you'd have feedback, development ideas or critique, please head to our [community forum](https://softwaremill.community/c/ox/12)! Alternatively, you can create an issue or submit a pull request on [GitHub](https://github.com/softwaremill/ox). ## Sponsors Development and maintenance of Ox is sponsored by [SoftwareMill](https://softwaremill.com), a software development and consulting company. We help clients scale their business through software. Our areas of expertise include backends, distributed systems, machine learning and data analytics. [![](https://files.softwaremill.com/logo/logo.png "SoftwareMill")](https://softwaremill.com) ## Commercial Support We offer commercial support for Ox and related technologies, as well as development services. [Contact us](https://softwaremill.com/contact/) to learn more about our offer! # Using Ox with AI coding assistants AI agents are generally quite good in writing Scala 3 + Ox applications. When the tech stack is explicitly specified (that is, mentions Scala 3, Ox, and other related libraries), when needed agents read Ox's documentation and the source code. However, sometimes, agents do need more specific guidance. That is especially true when it comes to direct-style, or functional programming in general. For these cases, we've created the [direct-style Scala skill](https://github.com/VirtusLab/scala-skill). ## Context7 [Context7](https://github.com/upstash/context7) is an open-source MCP (Model Context Protocol) server which aims to provide up-to-date documentation for AI coding assistants. You can use the managed, global MCP server, or run your own. Ox's documentation is [indexed on the global server](https://context7.com/softwaremill/ox). # Dependency (sbt, scala-cli, etc.) To use ox core in your project, add: ```scala // sbt dependency "com.softwaremill.ox" %% "core" % "1.0.5" // scala-cli dependency //> using dep com.softwaremill.ox::core:1.0.5 ``` Ox core depends only on the Java [jox](https://github.com/softwaremill/jox) project, where channels are implemented. There are no other direct or transitive dependencies. Integration modules have separate dependencies. # Project scope Ox covers the following areas: * streaming: push-based backpressured streaming designed for direct-style, with a rich set of stream transformations, flexible stream source & sink definitions and reactive streams integration * error management: retries, timeouts, a safe approach to error propagation, safe resource management * concurrency: high-level concurrency operators, developer-friendly structured concurrency, safe low-level primitives, communication between concurrently running computations * scheduling & timers * resiliency: circuit breakers, bulkheads, rate limiters, backpressure Ox enables writing simple, expression-oriented code in funcitonal style. The syntax overhead is kept to a minimum, preserving developer-friendly stack traces, and without compromising performance. ## Inspiration & building blocks * [Project Loom](https://openjdk.org/projects/loom/) (virtual threads) * structured concurrency Java APIs ([JEP 505](https://openjdk.org/jeps/505)) * scoped values ([JEP 506](https://openjdk.org/jeps/506)) * fast, scalable [Go](https://golang.org)-like channels using [jox](https://github.com/softwaremill/jox) * the [Scala 3](https://www.scala-lang.org) programming language # A tour of ox Run two computations [in parallel](high-level-concurrency/par.md): ```scala def computation1: Int = { sleep(2.seconds); 1 } def computation2: String = { sleep(1.second); "2" } val result1: (Int, String) = par(computation1, computation2) // (1, "2") ``` [Timeout](high-level-concurrency/timeout.md) a computation: ```scala def computation3: Int = { sleep(2.seconds); 1 } val result2: Either[TimeoutException, Int] = timeout(1.second)(computation3).catching[TimeoutException] // `timeout` only completes once the loosing branch is interrupted & done ``` [Race](high-level-concurrency/race.md) two computations: ```scala def computation4: Int = { sleep(2.seconds); 1 } def computation5: Int = { sleep(1.second); 2 } val result3: Int = raceSuccess(computation4, computation5) // as before, the loosing branch is interrupted & awaited before returning a result ``` [Structured concurrency](structured-concurrency/fork-join.md) & supervision: ```scala // equivalent of par supervised { val f1 = fork { sleep(2.seconds); 1 } val f2 = fork { sleep(1.second); 2 } (f1.join(), f2.join()) } ``` Error handling within a structured concurrency scope: ```scala supervised { forkUser: sleep(1.second) println("Hello!") forkUser: sleep(500.millis) throw new RuntimeException("boom!") } ``` [Retry](scheduling/retries.md) a computation: ```scala def computationR: Int = ??? retry(Schedule.exponentialBackoff(100.millis).maxRetries(4) .jitter().maxInterval(5.minutes))(computationR) ``` [Repeat](scheduling/repeat.md) a computation: ```scala def computationR: Int = ??? repeat(Schedule.fixedInterval(100.millis))(computationR) ``` [Rate limit](utils/rate-limiter.md) computations: ```scala supervised: val rateLimiter = RateLimiter.fixedWindowWithStartTime(2, 1.second) rateLimiter.runBlocking({ /* ... */ }) ``` Allocate a [resource](utils/resources.md) in a scope: ```scala supervised { val writer = useCloseableInScope(new java.io.PrintWriter("test.txt")) // ... use writer ... } // writer is closed when the scope ends (successfully or with an error) ``` [Create an app](utils/oxapp.md) which shuts down cleanly when interrupted with SIGINT/SIGTERM: ```scala object MyApp extends OxApp: def run(args: Vector[String])(using Ox): ExitCode = // ... your app's code ... // might use fork {} to create top-level background threads ExitCode.Success ``` Simple type-safe [actors](utils/actors.md): ```scala class Stateful { def increment(delta: Int): Int = ??? } supervised: val ref = Actor.create(new Stateful) // ref can be shared across forks, but only within the concurrency scope ref.ask(_.increment(5)) ``` Create a simple [flow](streaming/flows.md) & transform using a functional API: ```scala Flow.iterate(0)(_ + 1) // natural numbers .filter(_ % 2 == 0) .map(_ + 1) .intersperse(5) // compute the running total .mapStateful(0) { (state, value) => val newState = state + value (newState, newState) } .take(10) .runForeach(n => println(n.toString)) ``` Create flows which perform I/O and manage concurrency: ```scala def sendHttpRequest(entry: String): Unit = ??? Flow .fromInputStream(this.getClass().getResourceAsStream("/list.txt")) .linesUtf8 .mapPar(4)(sendHttpRequest) .runDrain() ``` Merge two flows, properly handling the failure of either branches: ```scala val f1 = Flow.tick(123.millis, "left") val f2 = Flow.tick(312.millis, "right") f1.merge(f2).take(100).runForeach(println) ``` Integrate flow with other components using an imperative API: ```scala def readNextBatch(): List[String] = ??? Flow.usingEmit { emit => forever: readNextBatch().foreach(emit.apply) } ``` Use completable high-performance [channels](streaming/channels.md) for inter-fork communication within concurrency scopes: ```scala val c = Channel.buffered[String](8) c.send("Hello,") c.send("World") c.done() ``` [Select](streaming/selecting-from-channels.md) from Go-like channels: ```scala val c = Channel.rendezvous[Int] val d = Channel.rendezvous[Int] select(c.sendClause(10), d.receiveClause) ``` [Unwrap eithers](basics/error-handling.md) and combine errors in a union type: ```scala val v1: Either[Int, String] = ??? val v2: Either[Long, String] = ??? val result: Either[Int | Long, String] = either: v1.ok() ++ v2.ok() ``` [Pipe & tap](utils/control-flow.md) values to functions to use the dot-syntax: ```scala def compute: Int = ??? def computeMore(v: Int): Long = ??? compute .pipe(2 * _) .tap(println) .pipe(computeMore) ``` Dive into the specific documentation sections for more details, variants and functionalities! # Direct style ## What is direct style? Direct style is an approach to programming where the results of effectful computations are available directly, without a "wrapper" type such as `Future`, `IO` or `Task`. That way, direct-style programs can leverage the built-in control flow constructs of the language as the basic building blocks of effectful code. I/O operations and thread synchronisations are executed as if they were blocking operations, even if under the hood they are run asynchronously, using continuations (which matter for throughput & performance). The results of I/O operations are available "directly", as the return values of the appropriate method calls. Some implementations may require using special syntax. In others, I/O calls are invoked as any other function or method call, and there's no intermediate library-level runtime that is needed. ## Compiler/runtime support Because I/O and synchronisations are "blocking", to make direct style efficient dedicated compiler or runtime support is needed. This takes various forms on various platforms: * [coroutines in Kotlin](https://kotlinlang.org/docs/coroutines-overview.html), where the compiler transforms functions which are "colored" using `suspend` to a finite state machine (using continuation-passing style - CPS) * similar coloring using `async` is done in [async-await in JavaScript](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function) * [abilities in Unison](https://www.unison-lang.org/docs/language-reference/abilities-and-ability-handlers) add an algebraic effect system, which is used to guide the CPS transformation * the [gears for Scala Native](https://github.com/lampepfl/gears) support relies on a runtime implementation of delimited continuations * also in Scala, direct style is sometimes supported in a localised fashion, by utilizing macros. See [dotty-cps-async](https://github.com/rssh/dotty-cps-async), [async-await for cats-effect](https://typelevel.org/cats-effect/docs/std/async-await), [zio-direct](https://github.com/zio/zio-direct) Finally, Java 21 introduced [virtual threads](https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html) as part of Project Loom. The goal of Project Loom is to enable programming in direct style on the JVM with performance matching that of reactive and asynchronous libraries, while keeping syntax of Java programs unchanged. To achieve that, the JVM runtime manages a pool of platform threads, onto which multiple virtual threads are scheduled. Moreover, all blocking operations have been retrofitted to be virtual-thread aware. Virtual threads have a low memory footprint, are cheap to create and fast to switch between. ## Direct style using Ox Direct style Scala aims to combine the safety, composability and local reasoning of functional programming with the ease of use and performance of imperative programming. This is a departure from a purely-functional style, as implemented by [cats-effect](https://github.com/typelevel/cats-effect) or [ZIO](https://zio.dev), in favor of running effectful computations imperatively. Note, however, that in all other aspects direct-style Scala remains functional: using immutable data structures, higher order functions, typeclasses, restricting effects, separating code and data, favoring function composition, etc. Ox uses the above mentioned virtual threads in Java 21 to implement a safe approach to concurrency, combined with Go-like channels for inter-thread communication. Moreover, Ox supports and proposes an approach to error handling, along with multiple utility functions providing safe resiliency, resource management, scheduling and others. The overarching goal of Ox is enabling safe direct-style programming using the power of the Scala 3 language. While still in its early days, a lot of functionality is available in ox today! ## Other direct-style Scala projects The wider goal of direct-style Scala is enabling teams to deliver working software quickly and with confidence. Our other projects, including [sttp client](https://sttp.softwaremill.com) and [tapir](https://tapir.softwaremill.com), also include integrations directly tailored towards direct style. # Error handling In Ox, we propose distinguishing two kinds of errors: 1. **unrecoverable errors**: bugs, catastrophic failures (e.g. out of memory) and conditions which require the current "processing unit" to be terminated. This might be handling of the current HTTP request (and returning a 500), handling of an incoming MQ message, or simply exiting a CLI. These are untyped **on purpose**, and signalled using exceptions. That way, implementing control flow based on the specific error type is discouraged. 2. **recoverable/"expected" errors**: anticipated failures, where the code can take specific corrective action based on the error's details. These are fully typed, represented as values - using `Either`s, or as part of a custom data type. ## Unrecoverable errors Exceptions are always appropriately handled by computation combinators, such as the high-level concurrency operations [`par`](../high-level-concurrency/par.md) and [`race`](../high-level-concurrency/race.md), as well as by [scopes](../structured-concurrency/fork-join.md) and [streams](../streaming/index.md). The general rule for computation combinators is that using them should throw exactly the same exceptions, as if the provided code was executed without them. That is, no additional exceptions might be thrown, and no exceptions are swallowed. The only difference is that some exceptions might be added as suppressed (e.g. interrupted exceptions). Some examples of exception handling in Ox include: * short-circuiting in `par` and `race` when one of the computations fails * retrying computations in `retry` when they fail * ending a `supervised` concurrency scope when a supervised fork fails Exceptions can be handled using the `try/catch/finally` mechanism. ```{note} An error which is unrecoverable at one level might become recoverable when caught at a higher level - however, only with the context that is accessible at the catch point. ``` ## Recoverable errors Some of the functionalities provided by Ox also support recoverable (application-level) errors. Such errors are represented as values, e.g. the left side of an `Either[MyError, MyResult]`. They are not thrown, but returned from the computations which are orchestrated by Ox. Ox must be made aware of how such recoverable errors are represented. This is done through an `ErrorMode`. Provided implementations include `EitherMode[E]` (where left sides of `Either`s are used to represent errors), and `UnionMode[E]`, where a union type of `E` and a successful value is used. Arbitrary user-provided implementations are possible as well. Error modes can be used in [`supervisedError`](../structured-concurrency/error-handling-scopes.md) scopes, as well as in variants of the `par`, `race`, `retry` methods, and others. ```{note} Representing recoverable errors as values might incur a syntax overhead, and might be less convenient in some cases. Moreover, all I/O libraries typically throw exceptions - to use them with errors-as-values, one would need to provide a wrapper which converts such exceptions to values, at the boundary where recovery becomes meaningful. ``` ## Boundary/break for `Either`s To streamline working with `Either` values, Ox provides a specialised version of the [boundary/break](https://www.scala-lang.org/api/current/scala/util/boundary$.html) mechanism. Within a code block passed to `either`, it allows "unwrapping" `Either`s using `.ok()`. The unwrapped value corresponds to the right side of the `Either`, which by convention represents successful computations. In case a failure is encountered (a left side of an `Either`), the computation is short-circuited, and the failure becomes the result. For example: ```scala import ox.either import ox.either.ok case class User() case class Organization() case class Assignment(user: User, org: Organization) def lookupUser(id1: Int): Either[String, User] = ??? def lookupOrganization(id2: Int): Either[String, Organization] = ??? val result: Either[String, Assignment] = either: val user = lookupUser(1).ok() val org = lookupOrganization(2).ok() Assignment(user, org) ``` You can also use union types to accumulate different types of errors, e.g.: ```scala import ox.either import ox.either.ok val v1: Either[Int, String] = ??? val v2: Either[Long, String] = ??? val result: Either[Int | Long, String] = either: v1.ok() ++ v2.ok() ``` Options can be unwrapped as well; the error type is then `Unit`: ```scala import ox.either import ox.either.ok val v1: Option[String] = ??? val v2: Option[Int] = ??? val result: Either[Unit, String] = either: v1.ok() * v2.ok() ``` Finally, a forked computation, resulting in an `Either`, can be joined & unwrapped using a single `ok()` invocation: ```scala import ox.{either, fork, Fork, supervised} import ox.either.ok val v1: Either[Int, String] = ??? supervised: val forkedResult: Fork[Either[Int, String]] = fork(either(v1.ok())) val result: Either[Int, String] = either: forkedResult.ok() ``` Failures can be reported using `.fail()`. For example (although a pattern match would be better in such a simple case): ```scala import ox.either import ox.either.{fail, ok} val v1: Either[String, Int] = ??? val result: Either[String, Int] = either: if v1.ok() > 10 then 42 else "wrong".fail() ``` ### Converting from exceptions An exception-throwing expression can be converted to an `Either` using the `.catching[E]` extension method (catches only non-fatal exceptions!): ```scala import ox.either.catching val userInput: Boolean = ??? val result: Either[IllegalArgumentException, Int] = (if userInput then 10 else throw new IllegalArgumentException("boom")) .catching[IllegalArgumentException] ``` Any `try-catch` blocks that you have in your code should be kept as small as possible, so that it's possibly obvious where the errors might originate. Using `.catching` at the sites where exceptions are thrown helps keeps the syntax lean and enables pinpointing where exceptions might occur. An alternative to an `either` block is an `either.catchAll` block which additionally catches any non-fatal exceptions that occur when evaluating the nested expression. Within the block, both `.ok()` and `.fail()` can be used. The error type within such block is fixed to `Throwable`: ```scala import ox.either import ox.either.ok def doWork(): Either[Exception, Boolean] = ??? val result: Either[Throwable, String] = either.catchAll: if doWork().ok() then "ok" else throw new RuntimeException("not ok") ``` ## Converting to exceptions For `Either` instances where the left-side is an exception, the right-value of an `Either` can be unwrapped using `.orThrow`. The exception on the left side is thrown if it is present: ```scala import ox.either.orThrow val v1: Either[Exception, Int] = Right(10) assert(v1.orThrow == 10) val v2: Either[Exception, Int] = Left(new RuntimeException("boom!")) v2.orThrow // throws RuntimeException("boom!") ``` ### Nested `either` blocks Either blocks cannot be nested in the same scope to prevent surprising failures after refactors. The `.ok()` combinator is typed using inference. Therefore, nesting of `either:` blocks can quickly lead to a scenario where due to a change in the return type of a method, another `either:` block will be selected by the `.ok()` combinator. This could lead to a change in execution semantics without a compile error. Consider: ```scala import ox.either import ox. either.* def returnsEither: Either[String, Int] = ??? val outerResult: Either[Exception, Unit] = either: val innerResult: Either[String, Int] = either: val i = returnsEither.ok() // this would jump to innerResult on Left // ... i () ``` Now, after a small refactor of `returnsEither` return type the `returnsEither.ok()` expression would still compile but instead of short-circuiting the inner `either:` block, it would immediately jump to the outer `either:` block on errors. ```scala import ox.either import ox.either.* def returnsEither: Either[Exception, Int] = ??? val outerResult: Either[Exception, Unit] = either: val innerResult: Either[String, Int] = either: val i = returnsEither.ok() // this would jump to outerResult on Left now! // ... i () ``` Proper way to solve this is to extract the inner `either:` block to a separate function: ```scala import ox.either import ox.either.* def returnsEither: Either[String, Int] = ??? def inner(): Either[String, Int] = either: val i = returnsEither.ok() // this can only jump to either on the opening of this function i val outerResult: Either[Exception, Unit] = either: val innerResult = inner() () ``` After this change refactoring `returnsEither` to return `Either[Exception, Int]` would yield a compile error on `returnsEither.ok()`. ## Mapping errors When an `Either` is used to represent errors, these can be mapped by using `.left.map`. This can be used to entirely transform the error type, or eliminate some errors in case e.g. a union type is used. For example: ```scala val e: Either[IllegalArgumentException | String, String] = ??? val e2: Either[String, String] = e.left.map { case e: IllegalArgumentException => s"Illegal argument: ${e.getMessage}" case other: String => other } ``` # Running computations in parallel A number of computations can be ran in parallel using the `par` method, for example: ```scala import ox.{par, sleep} import scala.concurrent.duration.* def computation1: Int = sleep(2.seconds) 1 def computation2: String = sleep(1.second) "2" val result: (Int, String) = par(computation1, computation2) // (1, "2") ``` If any of the computations fails, the other is interrupted. In such case, `par` waits until both branches complete and then re-throws the exception. It's also possible to run a sequence of computations given as a `Seq[() => T]` in parallel, optionally limiting the parallelism using `parLimit`: ```scala import ox.{parLimit, sleep} import scala.concurrent.duration.* def computation(n: Int): Int = sleep(1.second) println(s"Running $n") n*2 val computations = (1 to 20).map(n => () => computation(n)) val result: Seq[Int] = parLimit(5)(computations) // (1, "2") ``` ## Using application errors Some values might be considered as application errors. If a computation returns such an error, other computations are interrupted, same as when an exception is thrown. The error is then returned by the `par` method. It's possible to use an arbitrary [error mode](../basics/error-handling.md) by providing it as the initial argument to `par`. Alternatively, a built-in version using `Either` is available as `parEither`: ```scala import ox.{parEither, sleep} import scala.concurrent.duration.* val result = parEither( { sleep(200.millis) Right("ok") }, { sleep(100.millis) Left(-1) } ) // result is Left(-1), the other branch is interrupted ``` # Parallelize collection operations Ox contains a number of methods which allow parallelizing operations on collections. ## mapPar ```scala import ox.mapPar val input: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val result: List[Int] = input.mapPar(4)(_ + 1) // (2, 3, 4, 5, 6, 7, 8, 9, 10) ``` If any transformation fails, others are interrupted and `mapPar` rethrows exception that was thrown by the transformation. Parallelism limits how many concurrent forks are going to process the collection. ## foreachPar ```scala import ox.foreachPar val input: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) input.foreachPar(4)(i => println()) // prints each element of the list, might be in any order ``` Similar to `mapPar` but doesn't return anything. ## filterPar ```scala import ox.filterPar val input: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val result:List[Int] = input.filterPar(4)(_ % 2 == 0) // (2, 4, 6, 8, 10) ``` Filters collection in parallel using provided predicate. If any predicate fails, rethrows the exception and other forks calculating predicates are interrupted. ## collectPar ```scala import ox.collectPar val input: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val result: List[Int] = input.collectPar(4) { case i if i % 2 == 0 => i + 1 } // (3, 5, 7, 9, 11) ``` Similar to `mapPar` but only applies transformation to elements for which the partial function is defined. Other elements are skipped. # Race two computations A number of computations can be raced with each other using the `raceSuccess` method, for example: ```scala import ox.{raceSuccess, sleep} import scala.concurrent.duration.* def computation1: Int = sleep(2.seconds) 1 def computation2: Int = sleep(1.second) 2 val result: Int = raceSuccess(computation1, computation2) // 2 ``` The losing computation is interrupted. `raceSuccess` waits until both branches finish; this also applies to the losing one, which might take a while to clean up after interruption. It is also possible to race a sequence of computations, given as `Seq[() => T]`. ## Race variants * `raceSuccess` returns the first success, or if all fail, re-throws the first exception * `raceResult` returns the first success, or if any fail, re-throws the first exception (the first computation which finishes in any way is the "winner") ## Using application errors Some values might be considered as application errors. If a computation returns such an error, `raceSuccess` continues waiting if there are other computations in progress, same as when an exception is thrown. Ultimately, if no result is successful, `raceSuccess` either throws the first exception, or the first application error that has been reported (whichever comes first). It's possible to use an arbitrary [error mode](../basics/error-handling.md) by providing it as the initial argument to `raceSuccess`. Alternatively, a built-in version using `Either` is available as `raceEither`: ```scala import ox.{raceEither, sleep} import scala.concurrent.duration.* raceEither({ sleep(200.millis) Left(-1) }, { sleep(500.millis) Right("ok") }, { sleep(1.second) Right("also ok") }) ``` Here, the example returns `Right("ok")`; the first result is considered an error (a `Left`), and the third computation is cancelled. # Timeout a computation ```scala import ox.timeout import scala.concurrent.duration.DurationInt def computation: Int = sleep(2.seconds) 1 val result1: Try[Int] = Try(timeout(1.second)(computation)) // failure: TimeoutException val result2: Try[Int] = Try(timeout(3.seconds)(computation)) // success: 1 ``` A variant, `timeoutOption`, doesn't throw a `TimeoutException` on timeout, but returns `None` instead. # What is structured concurrency? Structured concurrency is an approach where the lifetime of a thread is determined by the syntactic structure of the code. First introduced by [Martin Sústrik](https://250bpm.com/blog:71/) and later popularized by [Nathaniel J. Smith](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/), structured concurrency made its way into Python, Kotlin, Java and now Scala. The basic concept in structured concurrency are scopes, within which concurrently running threads of execution can be started. The scope only finishes once all threads started within finish (either successfully, or with an error). Thus, it isn't possible to "leak" threads outside of a method. Threads become more a method's implementation detail, rather than an effect. These characteristics make structured concurrency an ideal candidate to make concurrency safer in direct-style programming, while keeping blocking-like method calls. Structured concurrency enables local reasoning on the threading effects, which is also one of the prime tenets of functional programming! Ox extends the structured concurrency concepts with various forms of error handling, described in the following sections. # Error handling in scopes How errors are handled depends on the type of concurrency scope that is used. ## Supervised scope The "default" and recommended scope is created using `supervised`. When this scope is used, any fork created using `fork` or `forkUser` that fails with an exception, will cause the enclosing scope to end: ```scala import ox.{forkUser, sleep, supervised} import scala.concurrent.duration.* supervised { forkUser { sleep(100.millis) throw new RuntimeException("boom!") } forkUser { // other forks will be interrupted } } // will re-throw the "boom!' exception ``` If an unsupervised fork fails (created using `forkUnsupervised` / `forkCancellable`), that exception will be thrown when invoking `Fork.join`. ## Supervised scope with application errors Additionally, supervised scopes can be created with an error mode, which allows ending the scope when a fork returns a value that is an [application error](../basics/error-handling.md). This can be done by using `supervisedError` and `forkError`, for example: ```scala import ox.{EitherMode, forkUserError, supervisedError} supervisedError(EitherMode[Int]) { forkUserError { Left(10) } Right(()) } // returns Left(10) ``` Even though the body of the scope returns success (a `Right`), the scope ends with an application error (a `Left`), which is reported by a user fork. Note that if we used a daemon fork, the scope might have ended before the error was reported. Only forks created with `forkError` and `forkUserError` can report application errors, and they **must** return a value of the shape as described by the error mode (in the example above, all `forkError`, `forkUserError` and the scope body must return an `Either[Int, T]` for arbitrary `T`s). The behavior of `fork` and `forkUser` in `supervisedError` scopes is unchanged, that is, their return values are not inspected. ## Unsupervised scopes In an unsupervised scope (created using `unsupervised`), failures of the forks won't be reported in any way, unless they are explicitly joined. Hence, if there's no `Fork.join`, the exception might go unnoticed. # Fork & join threads It's safest to use higher-level methods, such as `par` or `race`, however this isn't always sufficient. For these cases, threads can be started using the structured concurrency APIs described below. Forks (new threads) can only be started within a **concurrency scope**. Such a scope is defined using the `supervised`, `supervisedError` or `unsupervised` methods. The lifetime of the forks is defined by the structure of the code, and corresponds to the enclosing `supervised`, `supervisedError` or `unsupervised` block. Once the code block passed to the scope completes, any forks that are still running are interrupted. The whole block will complete only once all forks have completed (successfully, or with an exception). Hence, it is guaranteed that all forks started within `supervised`, `supervisedError` or `unsupervised` will finish successfully, with an exception, or due to an interrupt. For example, the code below is equivalent to `par`: ```scala import ox.{fork, sleep, supervised} import scala.concurrent.duration.* supervised { val f1 = fork { sleep(2.seconds) 1 } val f2 = fork { sleep(1.second) 2 } (f1.join(), f2.join()) } ``` It is a compile-time error to use any of the `fork` methods outside of a `supervised` block. Helper methods might require to be run within a scope by requiring the `Ox` capability: ```scala import ox.{fork, Fork, Ox, sleep, supervised} import scala.concurrent.duration.* def forkComputation(p: Int)(using Ox): Fork[Int] = fork { sleep(p.seconds) p + 1 } supervised { val f1 = forkComputation(2) val f2 = forkComputation(4) (f1.join(), f2.join()) } ``` Scopes can be arbitrarily nested. ## Types of forks - summary * `fork`: supervised, daemon fork * `forkUser`: supervised, user fork (scope will wait for it to complete, if there are no other errors) * `forkError`: supervised, daemon fork, which is allowed to fail with an application error * `forkUserError`: supervised, user fork, which is allowed to fail with an application error * `forkUnsupervised`: unsupervised fork * `forkCancellable`: unsupervised, cancellable fork ## Supervision The default scope, created with `supervised`, watches over the forks that are started within. Any forks started with `fork`, `forkUser`, `forkError` and `forUserError` are by default supervised. This means that the scope will end only when either: * all (user, supervised) forks, including the body passed to `supervised`, succeed * or any (supervised) fork, including the body passed to `supervised`, fails Hence an exception in any of the forks will cause the whole scope to end. Ending the scope means that all running forks are cancelled (using interruption). Once all forks complete, the exception is propagated further, that is re-thrown by the `supervised` method invocation: ```scala import ox.{fork, forkUser, Ox, sleep, supervised} import scala.concurrent.duration.* supervised { forkUser { sleep(1.second) println("Hello!") } fork { sleep(500.millis) throw new RuntimeException("boom!") } } // doesn't print "Hello", instead throws "boom!" ``` When using a supervised scope, the main thread is used for supervision, and the provided code block is executed on a new fork (virtual thread). ## User, daemon and unsupervised forks Forks created using `fork` behave as daemon threads. That is, their failure ends the scope, but the scope will also end once the body and all user forks succeed, regardless if the (daemon) fork is still running. Alternatively, a user fork can be created using `forkUser`. Such a fork is required to complete successfully, in order for the scope to end successfully. Hence, when the body of the scope completes, the scope will wait until all user forks have completed as well. Finally, entirely unsupervised forks can be started using `forkUnsupervised`. ## Unsupervised scopes An unsupervised scope can be created using `unsupervised`. Within such scopes, only `forkUnsupervised` and `forkCancellable` forks can be started. Once the code block passed to `unsupervised` completes, the scope ends, that is, all running forks are cancelled. Still, the `unsupervised` method returns (the scope completes) only once all forks have completed. Fork failures aren't handled in any special way, and can be inspected using the `Fork.join()` method. For helper method, the capability that needs to be passed is `OxUnsupervised`, a subtype of `Ox` that only allows starting unsupervised forks. ## Cancelling forks By default, forks are not cancellable by the user. Instead, all outstanding forks are cancelled (interrupted) when the enclosing scope ends. If needed, a cancellable fork can be created using `forkCancellable`. However, such an operation is more expensive, as it involves creating a nested scope and two virtual threads, instead of one. The `CancellableFork` trait exposes the `.cancel` method, which interrupts the fork and awaits its completion. Alternatively, `.cancelNow` returns immediately. In any case, the enclosing scope will only complete once all forks have completed. ## Customizing thread creation By default, for each fork a new virtual thread is created using `Thread.ofVirtual().factory()`. This can be customized globally using `oxThreadFactory`. ## Integrating structured concurrency with other libraries When integrating with libraries which manage their own threads, e.g. reactive libraries, it is sometimes necessary to be able to start forks from such library-managed threads. For this purpose, the `inScopeRunner` method can be used. A runner obtained from a scope-managed thread can be passed to other threads, and used to start forks within the concurrency scope, as long as it's not complete, from arbitrary threads. # Fork locals `ForkLocal`s replace usages of `ThreadLocal` when using Ox's forks and structural concurrency. They are useful to propagate auxiliary context, e.g. trace or correlation ids. A fork local needs to be first created with a default value. Then, its value can be set within a new [scope](fork-join.md). Usually, a new supervised scope is created, within which the `ForkLocal` is set to the given value - but only within that scope, as long as it's not completed. Hence, values are bound structurally: ```scala import ox.{ForkLocal, fork, supervised} val v = ForkLocal("a") supervised { println(v.get()) // "a" fork { v.supervisedWhere("x") { println(v.get()) // "x" fork { println(v.get()) // "x" }.join() } }.join() println(v.get()) // "a" } ``` Scoped values propagate across nested scopes. ```{note} Due to the "structured" nature of setting a fork local's value, forks using external (wider) scopes should not be created within a block where a fork local is set. An attempt to do so will throw a `java.util.concurrent.StructureViolationException`. ``` ## Creating helper functions which set fork locals If you're writing a helper function which sets a value of a fork local within a passed code block, you have to make sure that the code block doesn't accidentally capture the outer concurrency scope (leading to an exception on the first `fork`). This can be done by capturing the code block as a context function `Ox ?=> T`, so that any nested invocations of `fork` will use the provided instance, not the outer one. E.g.: ```scala def withSpan[T](spanName: String)(f: Ox ?=> T): T = val span = spanBuilder.startSpan(spanName) currentSpan.supervisedWhere(Some(span)) { try f finally span.end() } ``` ## Implementation notes `ForkLocal`s are based on an immutable map passed via a `ThreadLocal`, when a fork is started or a value set. The implementation will instead rely on `ScopedValue`s, which are part of [JEP 506](https://openjdk.org/jeps/506), when both scoped values and structured concurrency will be available as stable features in an LTS Java release. # Interruptions When catching exceptions, care must be taken not to catch & fail to propagate an `InterruptedException`. Doing so will prevent the scope cleanup mechanisms to make appropriate progress, as the scope won't finish until all started threads complete. A good solution is to catch only non-fatal exception using `NonFatal`, e.g.: ```scala import ox.{forever, fork, supervised} import org.slf4j.LoggerFactory import scala.util.control.NonFatal val logger = LoggerFactory.getLogger(this.getClass) def processSingleItem(): Unit = () supervised { fork { forever { try processSingleItem() catch case NonFatal(e) => logger.error("Processing error", e) } } // do something else } ``` # Streaming APIs Ox provides two complementary APIs for defining streaming data transformation pipelines. The first API uses a **functional style**, implemented as [flows](flows.md). A flow lets you stack multiple data transformations using high-level methods such as `map`, `mapPar`, `grouped`, `async`, `merge` and more. For example: ```scala import ox.channels.BufferCapacity import ox.flow.* def invokeService(n: Int): String = ??? def sendParsedNumbers(incoming: Flow[String])(using BufferCapacity): Unit = incoming .mapConcat(_.split(" ").flatMap(_.toIntOption)) .tap(n => println(s"Got: $n")) .mapPar(8)(invokeService) .runForeach(r => println("Result: $r")) ``` A flow **describes** the operations to perform; only when one of its `run` method is invoked, actual data processing starts. That is, a flow is lazily-evaluated. As part of implementing the individual transformation stages of a flow, channels, concurrency scopes and forks as used. Flows are a high-level API, built on top of channels (see below) and [forks](../structured-concurrency/fork-join.md). The second API is lower-level, uses an **imperative style** and is implemented using [channels](channels.md). As part of the code which defines how the data should be transformed, you can use the (blocking) `receive()` and `send()` methods on channels. You'll also often directly use [`Ox` concurrency scopes](../structured-concurrency/index.md) and [`fork`s](../structured-concurrency/fork-join.md). For example: ```scala import ox.* import ox.channels.* def parseNumbers(incoming: Source[String])(using Ox, BufferCapacity): Source[Int] = val results = BufferCapacity.newChannel[Int] forkPropagate(results) { repeatWhile: incoming.receiveOrClosed() match case ChannelClosed.Done => results.doneOrClosed(); false case ChannelClosed.Error(r) => results.errorOrClosed(r); false case t: String => t.split(" ").flatMap(_.toIntOption).foreach: n => println(s"Got: $n") results.send(n); true } results ``` While channels implement a "hot streams" approach to defining data transformation pipelines, flows correspond to "cold streams". You can use both approaches in a single pipeline, depending which approach better fits the task at hand. It's straightforward to convert a channel to a flow, and to run a flow to a channel. # Backpressure Channels and running flows are back-pressured. The `Channel.send` operation is blocking until there's a receiver thread available, or if there's enough space in the buffer. The processing space is hence bound by the total size of channel buffers. # Channels A channel is like a queue (values can be sent/received), but additionally channels support: * completion (a source can be `done`) * downstream error propagation * `select`ing exactly one channel clause to complete, where clauses include send and receive operations Creating a channel is a light-weight operation: ```scala import ox.channels.* val c = Channel.bufferedDefault[String] ``` This uses the default buffer size (16). It's also possible to create channels with other buffer sizes, as well as rendezvous or unlimited channels: ```scala import ox.channels.* val c1 = Channel.rendezvous[String] val c2 = Channel.buffered[String](5) val c3 = Channel.unlimited[String] ``` In rendezvous channels, a sender and receiver must "meet" to exchange a value. Hence, `.send` always blocks, unless there's another thread waiting on a `.receive`. In buffered channels, `.send` only blocks when the buffer is full. In an unlimited channel, sending never blocks. Channels implement two traits: `Source` and `Sink`. ## Sinks Data can be sent to a channel using `Sink.send`. Once no more data items are available, completion can be signalled downstream using `Sink.done`. If there's an error when producing data, this can be signalled using `Sink.error`: ```scala import ox.{fork, supervised} import ox.channels.* val c = Channel.rendezvous[String] supervised: fork: c.send("Hello") c.send("World") c.done() // TODO: receive ``` `.send` blocks the thread, hence usually channels are shared across forks to communicate data between them. As Ox is designed to work with Java 21+ and Virtual Threads, blocking is a cheap operation that might be done frequently. ## Sources A source can be used to receive elements from a channel. ```scala trait Source[+T]: def receive(): T ``` Same as `.send`, the `.receive` method might block the current thread. ### Creating sources Sources can be created by instantiating a new channel. There are also some basic factory methods on the `Source` companion object. Finally, a [flow](flows.md) can be run to a channel if needed, e.g.: ```scala import ox.supervised import ox.channels.Source import ox.flow.Flow import scala.concurrent.duration.* supervised: Source.fromValues(1, 2, 3) Flow.tick(1.second, "x").runToChannel() Flow.iterate(0)(_ + 1).runToChannel() // natural numbers ``` Typically, for each source created as shown above a fork is started, which sends the elements to the channel when capacity is available. If the enclosing `supervised` scope ends, each such fork is cancelled. ## Handling closed channels By default, `Sink.send` and `Source.receive` methods will throw a `ChannelClosedException`, if the channel is already closed: ```scala enum ChannelClosedException(cause: Option[Throwable]) extends Exception(cause.orNull): case Error(cause: Throwable) extends ChannelClosedException(Some(cause)) case Done() extends ChannelClosedException(None) ``` Alternatively, you can call `Sink.sendSafe` or `Source.receiveSafe`, which return union types: ```scala trait Source[+T]: def receive(): T def receiveSafe(): T | ChannelClosed trait Sink[-T]: def send(value: T): Unit def sendSafe(value: T): Unit | ChannelClosed def done(): Unit def doneSafe(): Unit | ChannelClosed def error(cause: Throwable): Unit def errorSafe(cause: Throwable): Unit | ChannelClosed sealed trait ChannelClosed object ChannelClosed: case class Error(reason: Option[Exception]) extends ChannelClosed case object Done extends ChannelClosed ``` That is, the result of a `safe` operation might be a value, or information that the channel is closed. Using extensions methods from `ChannelClosedUnion` it's possible to convert such union types to `Either`s, `Try`s or exceptions, as well as `map` over such results. # Error propagation Errors are only propagated downstream, ultimately reaching the point where the flow is run / source is discharged. This leads to an exception being thrown there. When running flows, any [scopes](../structured-concurrency/fork-join.md) started as part of executing the flow's stages should have completed, before the exception is re-thrown by the `run...` method. For channel-transforming operations, once the exception reaches the enclosing scope, any forks should become interrupted, including any that are still running and are handling the upstream processing stages. The approach we decided to take (only propagating errors downstream) is one of the two possible designs - with the other being re-throwing an exception when it's encountered. Please see [the respective ADR](../adr/0001-error-propagation-in-channels.md) for a discussion. # Flows A `Flow[T]` describes an asynchronous data transformation pipeline. When run, it emits elements of type `T`. Flows are lazy, evaluation (and any effects) happen only when the flow is run. Flows might be finite or infinite; in the latter case running a flow never ends normally; it might be interrupted, though. Finally, any exceptions that occur when evaluating the flow's logic will be thrown when running the flow, after any cleanup logic completes. ```{note} An introduction to Ox's Flow, along with some code samples is available [as a video](https://www.youtube.com/watch?v=2sZGVRXP9PM). ``` As a companion to this guide, refer to the ScalaDoc for a comprehensive list of: * [ways to create Flows](https://www.javadoc.io/static/com.softwaremill.ox/core_3/1.0.5/ox/flow/FlowCompanionOps.html#) * [ways to transform Flows](https://www.javadoc.io/static/com.softwaremill.ox/core_3/1.0.5/ox/flow/FlowOps.html#) * [ways to run Flows](https://www.javadoc.io/static/com.softwaremill.ox/core_3/1.0.5/ox/flow/FlowRunOps.html#) ## Creating flows There's a number of methods on the `Flow` companion object that can be used to create a flow: ```scala import ox.flow.Flow import scala.concurrent.duration.* Flow.fromValues(1, 2, 3) // a finite flow Flow.tick(1.second, "x") // an infinite flow, emitting "x" every second Flow.iterate(0)(_ + 1) // natural numbers ``` Note that creating a flow as above doesn't emit any elements, or execute any of the flow's logic. Only when run, the elements are emitted and any effects that are part of the flow's stages happen. Flows can also be created using [channel](channels.md) `Source`s: ```scala import ox.channels.Channel import ox.flow.Flow import ox.{fork, supervised} val ch = Channel.bufferedDefault[Int] supervised: fork: ch.send(1) ch.send(15) ch.send(-2) ch.done() Flow.fromSource(ch) // TODO: transform the flow further & run ``` Finally, flows can be created by providing arbitrary element-emitting logic: ```scala import ox.flow.Flow def isNoon(): Boolean = ??? Flow.usingEmit: emit => emit(1) for i <- 4 to 50 do emit(i) if isNoon() then emit(42) ``` The `emit: FlowEmit` instance is used to emit elements by the flow, that is process them further, as defined by the downstream pipeline. This method only completes once the element is fully processed, and it might throw exceptions in case there's a processing error. As part of the callback, you can create [supervision scopes](../structured-concurrency/error-handling-scopes.md), fork background computations or run other flows asynchronously. However, take care **not** to share the `emit: FlowEmit` instance across threads. That is, instances of `FlowEmit` are thread-unsafe and should only be used on the calling thread. The lifetime of `emit` should not extend over the duration of the invocation of `withEmit`. Any asynchronous communication should be best done with [channels](channels.md). You can then manually forward any elements received from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`. Alternatively, flows can be created by providing a function that receives a `Sink` (channel): ```scala import ox.flow.Flow import ox.{fork, supervised} supervised: Flow.usingChannel: sink => sink.send(1) fork: sink.send(2) sink.send(3) sink.send(4) // TODO: transform the flow further & run ``` Unlike `usingEmit`, the `Sink` instance can be safely shared across threads, as channels are thread-safe. The provided function is run asynchronously in a forked task. The flow completes when the function completes and the sink is automatically closed. If the function throws an exception, it is propagated as a flow error. Note that `Flow.usingChannel` must be run within a concurrency scope, as it creates a fork to run the provided function. ## Transforming flows: basics Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a number of pre-defined transformation stages, many of them similar in function to corresponding methods on Scala's collections: ```scala import ox.flow.Flow Flow.fromValues(1, 2, 3, 5, 6) .map(_ * 2) .filter(_ % 2 == 0) .take(3) .zip(Flow.repeat("a number")) .interleave(Flow.repeat((0, "also a number"))) // etc., TODO: run the flow ``` You can also define arbitrary element-emitting logic, using each incoming element using `.mapUsingEmit`, similarly to `Flow.usingEmit` above. ## Running flows Flows have to be run, for any processing to happen. This can be done with one of the `.run...` methods. For example: ```scala import ox.flow.Flow import scala.concurrent.duration.* Flow.fromValues(1, 2, 3).runToList() // List(1, 2, 3) Flow.fromValues(1, 2, 3).runForeach(println) Flow.tick(1.second, "x").runDrain() // never finishes ``` Running a flow is a blocking operation. Unless asynchronous boundaries are present (explicit or implicit, more on this below), the entire processing happens on the calling thread. For example such a pipeline: ```scala import ox.flow.Flow Flow.fromValues(1, 2, 3, 5, 6) .map(_ * 2) .filter(_ % 2 == 0) .runToList() ``` Processes the elements one-by-one on the thread that is invoking the run method. ## Transforming flows: concurrency A number of flow transformations introduces asynchronous boundaries. For example, `.mapPar(parallelism)(mappingFunction)` describes a flow, which runs the pipeline defined so far in the background, emitting elements to a [channel](channels.md). Another [fork](../structured-concurrency/fork-join.md) reads these elements and runs up to `parallelism` invocations of `mappingFunction` concurrently. Mapped elements are then emitted by the returned flow. Behind the scenes, an `Ox` concurrency scope is created along with a number of forks. In case of any exceptions, everything is cleaned up before the flow propagates the exceptions. The `.mapPar` logic ensures that any exceptions from the preceding pipeline are propagated through the channel. Some other stages which introduce concurrency include `.merge`, `.interleave`, `.groupedWithin` and [I/O](io.md) stages. The created channels serve as buffers between the pipeline stages, and their capacity is defined by the `BufferCapacity` in scope (a default instance is available, if not provided explicitly). Explicit asynchronous boundaries can be inserted using `.buffer()`. This might be useful if producing the next element to emit, and consuming the previous should run concurrently; or if the processing times of the consumer varies, and the producer should buffer up elements. ## Interoperability with channels Flows can be created from channels, and run to channels. For example: ```scala import ox.Ox import ox.channels.{BufferCapacity, Source} import ox.flow.Flow def transformChannel(ch: Source[String])(using Ox, BufferCapacity): Source[Int] = Flow.fromSource(ch) .mapConcat(_.split(" ")) .mapConcat(_.toIntOption) .filter(_ % 2 == 0) .runToChannel() ``` The method above needs to be run within a concurrency scope, as `.runToChannel()` creates a background fork which runs the pipeline described by the flow, and emits its elements onto the returned channel. ## Text transformations When dealing with flows of `Chunk[Byte]` or `String`s, you can leverage following built-in combinators for useful transformations: * `encodeUtf8` encodes a `Flow[String]` into a `Flow[Chunk[Byte]]` * `linesUtf8` decodes a `Flow[Chunk[Byte]]` into a `Flow[String]`. Assumes that the input represents text with line breaks. The `String` elements emitted by resulting `Flow[String]` represent text lines. * `decodeStringUtf8` to decode a `Flow[Chunk[Byte]]` into a `Flow[String]`, without handling line breaks, just processing input bytes as UTF-8 characters, even if a multi-byte character is divided into two chunks. Such operations may be useful when dealing with I/O like files, `InputStream`, etc. See [I/O](io.md). ## Logging Ox does not have any integrations with logging libraries, but it provides a simple way to log elements emitted by flows using the `.tap` method: ```scala import ox.flow.Flow Flow.fromValues(1, 2, 3) .tap(n => println(s"Received: $n")) .runToList() ``` ## Reactive streams interoperability ### Flow -> Publisher A `Flow` can be converted to a `java.util.concurrent.Flow.Publisher` using the `.toPublisher` method. This needs to be run within an `Ox` concurrency scope, as upon subscribing, a fork is created to run the publishing process. Hence, the scope should remain active as long as the publisher is used. Internally, elements emitted by the flow are buffered, using a buffer of capacity given by the `BufferCapacity` in scope. To obtain a `org.reactivestreams.Publisher` instance, you'll need to add the following dependency and import, to bring the `toReactiveStreamsPublisher` method into scope: ```scala // sbt dependency: "com.softwaremill.ox" %% "flow-reactive-streams" % "1.0.5" import ox.supervised import ox.flow.Flow import ox.flow.reactive.* val myFlow: Flow[Int] = ??? supervised: myFlow.toReactiveStreamsPublisher: org.reactivestreams.Publisher[Int] // use the publisher ``` ### Publisher -> Flow A `java.util.concurrent.Flow.Publisher` can be converted to a `Flow` using `Flow.fromPublisher`. Internally, elements published to the subscription are buffered, using a buffer of capacity given by the `BufferCapacity` in scope. That's also how many elements will be at most requested from the publisher at a time. To convert a `org.reactivestreams.Publisher` instance, you'll need the same dependency as above and call the `FlowReactiveStreams.fromPublisher` method. # Flows: files and I/O Ox allows creating a `Flow` which reads from a file or `InputStream`, as well as running a flow into a file or an `OutputStream`. The latter methods are available for `Flow[Chunk[Byte]]`. Ox takes care of closing files/streams after processing and on errors. ## InputStream and OutputStream ### Flow.fromInputStream A `Flow[Chunk[Byte]]` can be created using a `InputStream`: ```scala import ox.flow.Flow import java.io.ByteArrayInputStream import java.io.InputStream val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes) Flow .fromInputStream(inputStream) // Flow[Chunk[Byte]] .decodeStringUtf8 .map(_.toUpperCase) .runForeach(println) // "SOME INPUT" ``` You can define a custom chunk size instead of using the default: ```scala import ox.flow.Flow import java.io.ByteArrayInputStream import java.io.InputStream val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes) Flow .fromInputStream(inputStream, chunkSize = 4) // Flow[Chunk[Byte]] .decodeStringUtf8 .map(_.toUpperCase) .runForeach(println) // "SOME", " INPUT" ``` ### flow.toOutputStream A `Flow[Chunk[Byte]]` can be run to write to an `OutputStream`: ```scala import ox.flow.Flow import java.io.ByteArrayOutputStream val outputStream = new ByteArrayOutputStream() val flow = Flow.fromIterable(List("text1,", "text2")) flow .encodeUtf8 .runToOutputStream(outputStream) outputStream.toString // "TEXT1,TEXT2" ``` ## Files ### Flow.fromFile You can obtain a `Flow` of byte chunks read from a file for a given path: ```scala import ox.flow.Flow import java.nio.file.Paths Flow .fromFile(Paths.get("/path/to/my/file.txt")) .linesUtf8 .map(_.toUpperCase) .runToList() // List("FILE_LINE1", "FILE_LINE2") ``` Similarly to `.fromInputStream`, you can define custom chunk size using `Flow.fromFile(path: Path, chunkSize: Int)`. ### flow.toFile A `Flow[Chunk[Byte]]` can be written to a file under a given path: ```scala import ox.flow.Flow import java.nio.file.Paths Flow.fromIterable(List("text1,", "text2")) .encodeUtf8 .runToFile(Paths.get("/path/to/my/target/file.txt")) ``` # Selecting from channels Channels are distinct from queues in that they support a `select` method, which takes a number of channel clauses, and blocks until at least one clause is satisfied. The other channels are left intact (no values are sent or received). Channel clauses include: * `Source.receiveClause` - to receive a value from the channel * `Sink.sendClause(value)` - to send a value to a channel * `Default(value)` - to return the given value from the `select`, if no other clause can be immediately satisfied ## Receiving exactly one value from multiple channels The most common use-case for `select` is to receive exactly one value from a number of channels. There's a dedicated `select` variant for this use-case, which accepts a number of `Source`s, for which receive clauses are created. The signature for the two-source variant of this method is: ```scala def select[T1, T2](source1: Source[T1], source2: Source[T2]): T1 | T2 ``` As an example, this can be used as follows: ```scala import ox.supervised import ox.channels.* import ox.flow.Flow import scala.annotation.tailrec import scala.concurrent.duration.* case object Tick def consumer(strings: Source[String]): Nothing = supervised { val tick = Flow.tick(1.second, Tick).runToChannel() @tailrec def doConsume(acc: Int): Nothing = select(tick, strings) match case Tick => println(s"Characters received this second: $acc") doConsume(0) case s: String => doConsume(acc + s.length) doConsume(0) } ``` Selects are biased towards clauses/sources that appear first in the argument list. To achieve fairness, you might want to randomize the ordering of the clauses/sources. ## Mixed receive and send clauses The `select` method can also be used to send a value to exactly one channel, or with mixed receive and send clauses. It is guaranteed that exactly one clause will be satisfied (either a value sent, or received from exactly one of the channels). For example: ```scala import ox.channels.{Channel, select} val c = Channel.rendezvous[Int] val d = Channel.rendezvous[Int] select(c.sendClause(10), d.receiveClause) ``` The above will block until a value can be sent to `d` (as this is an unbuffered channel, for this to happen there must be a concurrently running `receive` call), or until a value can be received from `c`. The type returned by the above invocation is: ```scala c.Sent | d.Received ``` Note that the `Sent` and `Received` types are inner types of the `c` and `d` values. For different channels, the `Sent` / `Received` instances will have distinct classes, hence allowing distinguishing which clause has been satisfied. Channel closed values can be inspected, or converted to an exception using `.orThrow`. The results of a `select` can be inspected using a pattern match: ```scala import ox.channels.* val c = Channel.rendezvous[Int] val d = Channel.rendezvous[Int] select(c.sendClause(10), d.receiveClause) match case c.Sent() => println("Sent to c") case d.Received(v) => println(s"Received from d: $v") ``` If there's a missing case, the compiler will warn you that the `match` is not exhaustive, and give you a hint as to what is missing. Similarly, there will be a warning in case of an unneeded, extra match case. ## Closed channels (done / error) If any of the channels is, or becomes, closed (in an error state / done), `select` throws a `ChannelClosedException` with the details of the error / done state. Similarly as with `send` and `receive`, there's a `safe` variant for each `select` method overload, which returns a union type, e.g.: ```scala def selectSafe[T1, T2](source1: Source[T1], source2: Source[T2]): T1 | T2 | ChannelClosed ``` It is possible to inspect which channel is in a closed state by using the `.isClosedForSend` and `.isClosedForReceive` methods (plus detailed variants). ## Default clauses A default clause can be provided, which specifies the return value of the `select`, in case no other clause can be immediately satisfied. The clause can be created with `Default`, and in case the value is used, it is returned wrapped in `DefaultResult`. For example: ```scala import ox.channels.* val c = Channel.rendezvous[Int] select(c.receiveClause, Default(5)) match case c.Received(v) => println(s"Received from d: $v") case DefaultResult(v) => println(s"No value available in c, using default: $v") ``` There can be at most one default clause in a `select` invocation. ## Select with timeout For scenarios where you want to wait for a channel operation to complete, but only for a limited time, Ox provides two timeout-enabled select variants: `selectOrClosedWithin` and `selectWithin`. ### selectOrClosedWithin `selectOrClosedWithin` allows you to specify a timeout value that will be returned if no clauses can complete within the given duration: ```scala import ox.channels.* import ox.supervised import scala.concurrent.duration.* supervised: val c1 = Channel.rendezvous[Int] val c2 = Channel.rendezvous[String] // Returns "timeout" if no clause completes within 100ms val result = selectOrClosedWithin(100.millis, "timeout")(c1.receiveClause, c2.receiveClause) result match case c1.Received(value) => println(s"Received from c1: $value") case c2.Received(value) => println(s"Received from c2: $value") case "timeout" => println("Operation timed out") case closed: ChannelClosed => println(s"Channel closed: $closed") ``` You can also use `selectOrClosedWithin` directly with sources: ```scala import ox.channels.* import ox.supervised import scala.concurrent.duration.* supervised: val s1 = Channel.rendezvous[Int] val s2 = Channel.rendezvous[String] // Returns -1 if no source has a value within 50ms val result = selectOrClosedWithin(50.millis, -1)(s1, s2) result match case value: Int if value == -1 => println("Timeout occurred") case value: Int => println(s"Received int: $value") case value: String => println(s"Received string: $value") case closed: ChannelClosed => println(s"Channel closed: $closed") ``` ### selectWithin `selectWithin` is similar to `selectOrClosedWithin`, but instead of returning a timeout value, it throws a `TimeoutException` when the timeout is reached: ```scala import ox.channels.* import ox.supervised import scala.concurrent.duration.* import scala.concurrent.TimeoutException supervised: val c1 = Channel.rendezvous[Int] val c2 = Channel.rendezvous[String] try val result = selectWithin(100.millis)(c1.receiveClause, c2.receiveClause) result match case c1.Received(value) => println(s"Received from c1: $value") case c2.Received(value) => println(s"Received from c2: $value") catch case _: TimeoutException => println("Operation timed out") ``` Similarly with sources: ```scala import ox.channels.* import ox.supervised import scala.concurrent.duration.* import scala.concurrent.TimeoutException supervised: val s1 = Channel.rendezvous[Int] val s2 = Channel.rendezvous[String] try val result = selectWithin(50.millis)(s1, s2) println(s"Received: $result") catch case _: TimeoutException => println("No data available within timeout") ``` ### When to use which variant - Use `selectOrClosedWithin` when you want to provide a default value or handle timeouts as part of your normal flow - Use `selectWithin` when you want timeout to be treated as an exceptional condition that should interrupt normal flow Both variants support all the same overloads as regular `select`: single clauses, multiple clauses (up to 5), and sequences of clauses or sources. # Transforming channels ## Transforming eagerly Sources can be transformed by receiving values, manipulating them and sending to other channels - this provides the highest flexibility and allows creating arbitrary channel topologies. Some basic channel-transformaing operations are available as methods on `Source`. For example: ```scala import ox.supervised import ox.channels.{Channel, Source} supervised: val c = Channel.rendezvous[String] val c2: Source[Int] = c.map(s => s.length()) ``` The `.map` needs to be run within a scope, as it starts a new virtual thread (using `fork`), which: * immediately starts receiving values from the given source * applies the given function * sends the result to the new channel The new channel is returned to the user as the return value of `.map`. To run multiple transformations within one virtual thread / fork, the `.transform` method is available: ```scala import ox.supervised import ox.channels.Source supervised: Source.fromIterable(1 to 1000) .transform(_.filter(_ % 2 == 0).map(_ + 1).take(10)) // take the 10 first even numbers, incremented by 1 .foreach(n => println(n.toString)) ``` ```{note} For more advanced transformation options, use [flows](flows.md). ``` ## Capacity of transformation stages Most source and some flow transformation methods create new channels, on which the transformed values are produced. The capacity of these channels by default is 16 (buffered). This can be overridden by providing `BufferCapacity` given, e.g.: ```scala (v: Source[Int]).map(_ + 1)(using BufferCapacity(10)) ``` ## Discharging channels Values of a source can be discharged using methods such as `.foreach`, `.toList`, `.pipeTo` or `.drain`: ```scala import ox.supervised import ox.channels.Source supervised: val s = Source.fromValues(1, 2, 3) s.toList: List[Int] // List(1, 2, 3) ``` These methods are blocking, as they drain the channel until no more values are available (when the channel is done). ### Closed channels (done / error) If the channel encounters an error, the discharging method will throws a `ChannelClosedException.Error`. Similarly as with `send` and `receive`, there's a `safe` variant for each discharing method, which returns a union type, e.g.: ```scala import ox.supervised import ox.channels.{ChannelClosed, Source} supervised: val s = Source.fromValues(1, 2, 3) s.toList: List[Int] | ChannelClosed.Error // List(1, 2, 3) ``` # Retries The retries mechanism allows to retry a failing operation according to a given configuration (e.g. retry 3 times with a 100ms delay between attempts). ## API The basic syntax for retries is: ```scala import ox.resilience.retry retry(schedule)(operation) ``` The `retry` API uses `scheduled` underneath with DSL focused on retries. See [scheduled](scheduled.md) for more details. ## Operation definition The `operation` can be provided directly using a by-name parameter, i.e. `f: => T`. There's also a `retryEither` variant which accepts a by-name `Either[E, T]`, i.e. `f: => Either[E, T]`, as well as one which accepts arbitrary [error modes](../basics/error-handling.md), accepting the computation in an `F` context: `f: => F[T]`. ## Configuration Retries can be configured using a `RetryConfig` instance, which consists of three parts: - a `Schedule`, which indicates how many times and with what delay should we retry the `operation` after an initial failure, - a `ResultPolicy`, which indicates whether: - a non-erroneous outcome of the `operation` should be considered a success (if not, the `operation` would be retried), - an erroneous outcome of the `operation` should be retried or fail fast. - a `onRetry`, which is a callback function that is invoked after each attempt to execute the operation. It is used to perform any necessary actions or checks after each attempt, regardless of whether the attempt was successful or not. For convenience, the `retry` method accepts either a full `RetryConfig`, or just the `Schedule`. In the latter case, a default `RetryConfig` is created, using the schedule. ### Schedules Schedules can be created using one of the following methods on the companion object, including: - `Schedule.immediate`, - `Schedule.fixedInterval(interval: FiniteDuration)`, - `Schedule.intervals(intervals: FiniteDuration*)`, - `Schedule.exponentialBackoff(initialDelay: FiniteDuration)`, - `Schedule.fibonacciBackoff(initialDelay: FiniteDuration)`, - `Schedule.decorrelatedJitter(min: FiniteDuration)`, - `Schedule.computed[S](initialState: S, compute: S => (S, Option[FiniteDuration]))` The schedules can be then customized using methods which include the following: - `.maxRetries(maxRetries: Int)` - specifies the number of retries after the initial attempt - `.maxAttempts(maxAttempts: Int)` - specifies the total number of invocations - `.maxInterval(maxInterval: FiniteDuration)` - `.jitter(jitter: Jitter)` - `.maxRetriesByCumulativeDelay(upTo: FiniteDuration)` - `.withInitialDelay(interval: FiniteDuration)` - `.withNoInitialDelay()` - `.andThen(other: Schedule)` See [scheduled](scheduled.md) for details on how to create custom schedules. ### Result policies A result policy allows to customize how the results of the `operation` are treated. It consists of two predicates: - `isSuccess: T => Boolean` (default: `true`) - determines whether a non-erroneous result of the `operation` should be considered a success. When it evaluates to `true` - no further attempts would be made, otherwise - we'd keep retrying. With finite schedules (i.e. those with `maxRetries` defined), if `isSuccess` keeps returning `false` when `maxRetries` are reached, the result is returned as-is, even though it's considered "unsuccessful", - `isWorthRetrying: E => Boolean` (default: `true`) - determines whether another attempt would be made if the `operation` results in an error `E`. When it evaluates to `true` - we'd keep retrying, otherwise - we'd fail fast with the error. The `ResultPolicy[E, T]` is generic both over the error (`E`) and result (`T`) type. Note, however, that for the direct variant `retry`, the error type `E` is fixed to `Throwable`, while for the `Either` and error-mode variants, `E` can ba an arbitrary type. If you want to customize a part of the result policy, you can use the following shorthands: - `ResultPolicy.default[E, T]` - uses the default settings, - `ResultPolicy.successfulWhen[E, T](isSuccess: T => Boolean)` - uses the default `isWorthRetrying` and the provided `isSuccess`, - `ResultPolicy.retryWhen[E, T](isWorthRetrying: E => Boolean)` - uses the default `isSuccess` and the provided `isWorthRetrying`, - `ResultPolicy.neverRetry[E, T]` - uses the default `isSuccess` and fails fast on any error. ### On retry The callback function has the following signature: ``` (Int, Either[E, T]) => Unit ``` Where: - The first parameter, an `Int`, represents the attempt number of the retry operation. - The second parameter is an `Either[E, T]` type, representing the result of the retry operation. Left represents an error and Right represents a successful result. ## Examples ```scala import ox.UnionMode import ox.resilience.{retry, retryEither, retryWithErrorMode, ResultPolicy, RetryConfig} import ox.scheduling.{Jitter, Schedule} import scala.concurrent.duration.* def directOperation: Int = ??? def eitherOperation: Either[String, Int] = ??? def unionOperation: String | Int = ??? // various operation signatures/error modes - same syntax retry(Schedule.immediate.maxRetries(3))(directOperation) retryEither(Schedule.immediate.maxRetries(3))(eitherOperation) // various configs with custom schedules and default ResultPolicy retry(Schedule.fixedInterval(100.millis).maxRetries(3))(directOperation) retry(Schedule.exponentialBackoff(100.millis).maxRetries(3))(directOperation) retry(Schedule.exponentialBackoff(100.millis).maxRetries(3).jitter().maxInterval(5.minutes))( directOperation) // infinite retries with a default ResultPolicy retry(Schedule.fixedInterval(100.millis))(directOperation) retry(Schedule.exponentialBackoff(100.millis).jitter(Jitter.Full).maxInterval(5.minutes))( directOperation) // result policies // custom success retry(RetryConfig[Throwable, Int]( Schedule.immediate.maxRetries(3), ResultPolicy.successfulWhen(_ > 0)))(directOperation) // fail fast on certain errors retry(RetryConfig[Throwable, Int]( Schedule.immediate.maxRetries(3), ResultPolicy.retryWhen(_.getMessage != "fatal error")))( directOperation) retryEither(RetryConfig( Schedule.immediate.maxRetries(3), ResultPolicy.retryWhen(_ != "fatal error")))( eitherOperation) // custom error mode retryWithErrorMode(UnionMode[String])(RetryConfig[String, Int]( Schedule.immediate.maxRetries(3), ResultPolicy.retryWhen(_ != "fatal error")))( unionOperation) ``` See the tests in `ox.resilience.*` for more. ## Adaptive retries A retry strategy, backed by a token bucket. Every retry costs a certain amount of tokens from the bucket, and every success causes some tokens to be added back to the bucket. If there are not enough tokens, retry is not attempted. This way retries don't overload a system that is down due to a systemic failure (such as a bug in the code, excessive load etc.): retries will be attempted only as long as there are enough tokens in the bucket, then the load on the downstream system will be reduced so that it can recover. In contrast, using a "normal" retry strategy, where every operation is retries up to 3 times, a failure causes the load on the system to increase 4 times. For transient failures (component failure, infrastructure issues etc.), retries still work "normally", as the bucket has enough tokens to cover the cost of multiple retries. ### Inspiration * [`AdaptiveRetryStrategy`](https://github.com/aws/aws-sdk-java-v2/blob/master/core/retries/src/main/java/software/amazon/awssdk/retries/AdaptiveRetryStrategy.java) from `aws-sdk-java-v2` * *["Try again: The tools and techniques behind resilient systems" from re:Invent 2024](https://www.youtube.com/watch?v=rvHd4Y76-fs) ### Configuration To use adaptive retries, create an instance of `AdaptiveRetry`. These instances are thread-safe and are designed to be shared. Typically, a single instance should be used to proxy access to a single constrained resource. `AdaptiveRetry` is parametrized with: * `tokenBucket: TokenBucket`: instances of `TokenBucket` can be shared across multiple instances of `AdaptiveRetry` * `failureCost: Int`: number of tokens that are needed for retry in case of failure * `successReward: Int`: number of tokens that are added back to token bucket after success `RetryConfig` (including the `Schedule` and `ResultPolicy`) is defined the same as with "normal" retry mechanism, all the configuration from above also applies here. Instance with default configuration can be obtained with `AdaptiveRetry.default` (bucket size = 500, cost for failure = 5 and reward for success = 1). ### API `AdaptiveRetry` exposes three variants of retrying, which correspond to the three variants discussed above: `retry`, `retryEither` and `retryWithErrorMode`. `retry` will attempt to retry an operation if it throws an exception; `retryEither` will retry, if the result is a `Left`. Finally `retryWithErrorMode` is the most flexible, and allows retrying operations using custom failure modes (such as union types). The methods have an additional parameter, `shouldPayPenaltyCost`, which determines if result `Either[E, T]` should be considered as a failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation. ### Examples If you want to use this mechanism you need to run operation through instance of `AdaptiveRetry`: ```scala import ox.UnionMode import ox.resilience.AdaptiveRetry import ox.resilience.{ResultPolicy, RetryConfig} import ox.scheduling.{Jitter, Schedule} import scala.concurrent.duration.* def directOperation: Int = ??? def eitherOperation: Either[String, Int] = ??? def unionOperation: String | Int = ??? val adaptive = AdaptiveRetry.default // various configs with custom schedules and default ResultPolicy adaptive.retry(Schedule.immediate.maxRetries(3))(directOperation) adaptive.retry(Schedule.fixedInterval(100.millis).maxRetries(3))(directOperation) adaptive.retry(Schedule.exponentialBackoff(100.millis).maxRetries(3))(directOperation) adaptive.retry(Schedule.exponentialBackoff(100.millis).maxRetries(3).jitter() .maxInterval(5.minutes))(directOperation) // result policies // custom success adaptive.retry(RetryConfig[Throwable, Int]( Schedule.immediate.maxRetries(3), ResultPolicy.successfulWhen(_ > 0)))(directOperation) // fail fast on certain errors adaptive.retry(RetryConfig[Throwable, Int]( Schedule.immediate.maxRetries(3), ResultPolicy.retryWhen(_.getMessage != "fatal error")))( directOperation) adaptive.retryEither(RetryConfig( Schedule.immediate.maxRetries(3), ResultPolicy.retryWhen(_ != "fatal error")))( eitherOperation) // custom error mode adaptive.retryWithErrorMode(UnionMode[String])(RetryConfig[String, Int]( Schedule.immediate.maxRetries(3), ResultPolicy.retryWhen(_ != "fatal error")))( unionOperation) // consider "throttling error" not as a failure that should incur the retry penalty adaptive.retryWithErrorMode(UnionMode[String])(RetryConfig[String, Int]( Schedule.immediate.maxRetries(3), ResultPolicy.retryWhen(_ != "fatal error")), shouldPayFailureCost = _.fold(_ != "throttling error", _ => true))(unionOperation) ``` # Repeat The `repeat` functions allow to repeat an operation according to a given schedule (e.g. repeat 3 times with a 100ms interval and 50ms of initial delay). ## API The basic syntax for `repeat` is: ```scala import ox.scheduling.repeat repeat(schedule)(operation) ``` The `repeat` API uses `scheduled` underneath with DSL focused on repeats. See [scheduled](scheduled.md) for more details. ## Operation definition Similarly to the [retry](retries.md) API, the `operation` can be defined: * directly using a by-name parameter, i.e. `f: => T` * using a by-name `Either[E, T]` * or using an arbitrary [error mode](../basics/error-handling.md), accepting the computation in an `F` context: `f: => F[T]`. ## Configuration The `repeat` config requires a `Schedule`, which indicates how many times and with what interval should the `operation` be repeated. In addition, it is possible to define a custom `shouldContinueOnResult` strategy for deciding if the operation should continue to be repeated after a successful result returned by the previous operation (defaults to `_: T => true`). If an operation returns an error, the repeat loop will always be stopped. If an error handling within the operation is needed, you can use a `retry` inside it (see an example below) or use `scheduled` instead of `repeat`, which allows full customization. This is captured as a `RepeatConfig` instance. For convenience, the `repeat` method accepts either a full `RepeatConfig`, or just the `Schedule`. In the latter case, a default `RepeatConfig` is created, using the schedule. The [retry](retries.md) documentation includes an overview of the available ways to create and modify a `Schedule`. ## Examples ```scala import ox.UnionMode import ox.scheduling.{Schedule, repeat, repeatEither, repeatWithErrorMode, RepeatConfig} import ox.resilience.{retry, RetryConfig} import scala.concurrent.duration.* def directOperation: Int = ??? def eitherOperation: Either[String, Int] = ??? def unionOperation: String | Int = ??? // various operation definitions - same syntax repeat(Schedule.immediate.maxAttempts(3))(directOperation) repeatEither(Schedule.immediate.maxAttempts(3))(eitherOperation) // various schedules repeat(Schedule.fixedInterval(100.millis).maxAttempts(3))(directOperation) repeat(Schedule.fixedInterval(100.millis).maxAttempts(3).withInitialDelay(50.millis))( directOperation) // infinite repeats with a custom strategy def customStopStrategy: Int => Boolean = ??? repeat(RepeatConfig(Schedule.fixedInterval(100.millis)) .copy(shouldContinueOnResult = customStopStrategy))(directOperation) // custom error mode repeatWithErrorMode(UnionMode[String])(Schedule.fixedInterval(100.millis).maxAttempts(3))( unionOperation) // repeat with retry inside repeat(Schedule.fixedInterval(100.millis).maxAttempts(3)) { retry(Schedule.exponentialBackoff(100.millis).maxAttempts(3))(directOperation) } ``` # Scheduled The `scheduled` functions allow to run an operation according to a given schedule. It is preferred to use `repeat`, `retry`, or combination of both functions for most use cases, as they provide a more convenient DSL. In fact `retry` and `repeat` use `scheduled` internally. ## Operation definition Similarly to the `retry` and `repeat` APIs, the `operation` can be provided: * directly using a by-name parameter, i.e. `f: => T` * using a by-name `Either[E, T]` * or using an arbitrary [error mode](../basics/error-handling.md), accepting the computation in an `F` context: `f: => F[T]`. ## Configuration The `scheduled` config consists of: - a `Schedule`, which indicates how many times the `operation` should be run, provides a duration based on which a sleep is calculated and provides an initial delay if configured. - a `SleepMode`, which determines how the sleep between subsequent operations should be calculated: - `Interval` - default for `repeat` operations, where the sleep is calculated as the duration provided by schedule minus the duration of the last operation (can be negative, in which case the next operation occurs immediately). - `Delay` - default for `retry` operations, where the sleep is just the duration provided by schedule. - `afterAttempt` - a callback function that is invoked after each operation and determines if the scheduler loop should continue. Used for `onRetry`, `shouldContinueOnError`, `shouldContinueOnResult` and adaptive retries in `retry` API. Defaults to always continuing. ## Schedule See the [retry](retries.md) documentation for an overview of the available ways to create and modify a `Schedule`. ### Testing schedules Schedules can be tested by forcing the evaluation of `Schedule.intervals` and inspecting the resulting lazy list of intervals. # OxApp To properly handle application interruption and clean shutdown, Ox provides a way to define application entry points using `OxApp` trait. The application's main `run` function is then executed on a virtual thread, with a root `Ox` capability provided. Here's an example: ```scala import ox.* import scala.concurrent.duration.* object MyApp extends OxApp: def run(args: Vector[String])(using Ox): ExitCode = forkUser { sleep(500.millis) println("Fork finished!") } println(s"Started app with args: ${args.mkString(", ")}!") ExitCode.Success ``` When the application receives a SIGINT/SIGTERM, e.g. due to a CTRL+C, the root scope (and hence any child scopes and forks) are interrupted. This allows for a clean shutdown: any resources that are attached to scopes, or managed using `try-finally` blocks, are released. Application shutdown is handled by adding a `Runtime.addShutdownHook`. ```{warning} When using `OxApp`, within an Ox-managed fork (thread), do not use `System.exit` or `sys.exit` to shut down your application. This will cause a deadlock with the cleanup process. For more details see [#368](https://github.com/softwaremill/ox/issues/368). ``` In the code below, the resource is released when the application is interrupted: ```scala import ox.* object MyApp extends OxApp: def run(args: Vector[String])(using Ox): ExitCode = releaseAfterScope: println("Releasing ...") println("Waiting ...") never ``` The `run` function receives command line arguments as a `Vector` of `String`s, a given `Ox` capability and has to return an `ox.ExitCode` value which translates to the exit code returned from the program. `ox.ExitCode` is defined as: ```scala enum ExitCode(val code: Int): case Success extends ExitCode(0) case Failure(exitCode: Int = 1) extends ExitCode(exitCode) ``` There's also a simplified variant of `OxApp` for situations where you don't care about command line arguments. The `run` function doesn't take any arguments beyond the root `Ox` capability, expects no `ExitCode` and will handle any exceptions thrown by printing a stack trace and returning an exit code of `1`: ```scala import ox.* object MyApp extends OxApp.Simple: def run(using Ox): Unit = println("All done!") ``` `OxApp` has also a variant that integrates with [either](../basics/error-handling.md#boundary-break-for-eithers) blocks for direct-style error handling called `OxApp.WithEitherErrors[E]`. Here, `E` is the type of errors from the `run` function that you want to handle. The interesting bit is that `run` function in `OxApp.WithEitherErrors` receives an `either` block token of type `EitherError[E]` (which itself is an alias for `Label[Either[E, ExitCode]]` as `either` operates on boundary/break mechanism). Therefore, it's possible to use `.ok()` combinators directly in the `run` function scope. `OxApp.WithEitherErrors` requires that one implements a function that translates application errors into `ExitCode` instances. Here's an example that always fails and exits with exit code `23`: ```scala import ox.* import ox.either.* sealed trait MyAppError case class ComputationError(msg: String) extends Exception(msg) with MyAppError object MyApp extends OxApp.WithEitherErrors[MyAppError]: def doWork(): Either[MyAppError, Unit] = Left(ComputationError("oh no")) def handleError(myAppError: MyAppError): ExitCode = myAppError match { case ComputationError(_) => ExitCode.Failure(23) } def run(args: Vector[String])(using Ox, EitherError[MyAppError]): ExitCode = doWork().ok() // will end the scope with MyAppError as `doWork` returns a Left ExitCode.Success ``` ## Additional configuration All `ox.OxApp` instances can be configured by overriding the `def settings: Settings` method. Settings include: * `interruptedExitCode`: what exit code should be returned by the application once it gracefully shutdowns after it was interrupted (for example Ctrl+C was pressed by the user). By default `0` (graceful shutdown) * `handleException` and `handleInterruptedException`: callbacks for exceptions that occur when evaluating the application's body, or that are thrown when the application shuts down due to an interruption (SIGINT/SIGTERM). By default, the stack traces are printed to stderr, unless a default uncaught exception handler is defined. * `threadFactory`: the thread factory that is used to create threads in Ox scopes ([[supervised]], [[unsupervised]] etc.). Useful e.g. when integrating with third-party libraries to propagate context across (virtual) thread boundaries. * `shutdownTimeout`: the maximum amount of time a clean shutdown might take. By default 10 seconds. This might prevent deadlocks due to usage of `System.exit` in the user's code. After the timeout passes, the application will forcibly exit. Settings can be overridden: ```scala import ox.* import scala.concurrent.duration.* object MyApp extends OxApp: override def settings: OxApp.Settings = OxApp.Settings.Default.copy( interruptedExitCode = ExitCode.Failure(130) ) def run(args: Vector[String])(using Ox): ExitCode = sleep(60.seconds) ExitCode.Success ``` # Actors Actors in Ox enable invoking methods on an object serially, keeping the behavior as close as possible to a direct invocation. That is, even though invocations may happen from multiple threads, they are guaranteed to happen one after the other, not concurrently. Actor invocations are fully type-safe, with minimal overhead. They use [channels](../streaming/channels.md) and [scopes](../structured-concurrency/fork-join.md) behind the scenes. One of the use-cases is integrating with external APIs, which are represented by an object containing mutable state. Such integrations must be protected and cannot be accessed by multiple threads concurrently. ```{note} Note that actors as described below are a very basic implementation, covering only some use cases for local concurrency. In general, actors are especially useful when working in distributedor clustered systems, or when implementing patterns such as event sourcing. For these use-cases, see the [Pekko](https://pekko.apache.org) project. ``` An actor can be created given any value (representing the actor's state) using `Actor.create`. This creates a fork in the current concurrency scope, and a channel (using the `BufferCapacity` in scope) for scheduling invocations on the actor's logic. The result is an `ActorRef`, using which invocations can be scheduled using either the `ask` or `tell` methods. ## Ask `ask` sends an invocation to the actor and awaits for a result. For example: ```scala import ox.supervised import ox.channels.* class Stateful: private var counter: Int = 0 def increment(delta: Int): Int = counter += delta counter supervised: val ref = Actor.create(new Stateful) ref.ask(_.increment(5)) // blocks until the invocation completes ref.ask(_.increment(4)) // returns 9 ``` If a non-fatal exception is thrown by the invocation, it's propagated to the caller, and the actor continues processing other invocations. Fatal exceptions (e.g. interruptions) are propagated to the enclosing actor's scope, and the actor closes - trying to create another invocation will throw an exception. In this approach, actor's internal state usually has to be mutable. For a more functional style, an actor's implementation can contain a state machine with a single mutable field, containing the current state; each invocation of an actor's method can then match on the current state, and calculate the next one. ## Tell It's also possible to schedule an invocation to be processed in the background using `.tell`. This method only blocks until the invocation can be sent to the actor's channel, but doesn't wait until it's processed. Note that any exceptions that occur when handling invocations scheduled using `.tell` will be propagated to the actor's enclosing scope, and will cause the actor to close. ## Close When creating an actor, it's possible to specify a callback that will be called uninterruptedly before the actor closes. Such a callback can be used to release any resources held by the actor's logic. It's called when the actor closes, which includes closing of the enclosing scope: ```scala import ox.{never, supervised} import ox.channels.* class Stateful: def work(howHard: Int): Unit = throw new RuntimeException("boom!") def close(): Unit = println("Closing") supervised: val ref = Actor.create(new Stateful, Some(_.close())) // fire-and-forget, exception causes the scope to close ref.tell(_.work(5)) // preventing the scope from closing never ``` # Circuit Breaker A circuit breaker is used to provide stability and prevent cascading failures in distributed systems. These should be used with other mechanisms (such as timeouts or rate limiters) to prevent the failure of a single component from bringing down all components. The Circuit Breaker can proactively identify unresponsive services and prevent repeated attempts. The circuit breaker allows controlling execution of operations and stops if certain condition are met. CircuitBreaker is thread-safe and can be used in concurrent scenarios. ## API ```scala import ox.supervised import ox.resilience.* supervised: val circuitBreaker = CircuitBreaker() type T def operation: T = ??? val operationResult: Option[T] = circuitBreaker.runOrDrop(operation) ``` The CircuitBreaker is a finite state machine with three states: `Closed`, `Open` and `HalfOpen`. - While in `Open` state - all calls are dropped. - In `Closed` state - calls are accepted. - In `HalfOpen` state - only configured number of call can be started and depending on their results state can go back to `Open` or `Closed`. See [conditions for state change](#conditions-for-state-change). ## Configuration Many config parameters relate to calculated metrics. Those metrics are percentage of calls that failed and percentage of calls that exceeded `slowCallDurationThreshold`. Which calls are included during calculation of these metrics are determined by `SlidingWindow` configuration. ### Sliding window There are two ways that metrics are calculated. - Count based sliding window - `SlidingWindow.CountBased`, counts metrics based on last n call results. - Time based sliding window - `SlidingWindow.TimeBased`, counts metrics based on call results recorded in the lapse of duration before current time. ### Failure rate and slow call rate thresholds The state of the CircuitBreaker changes from `Closed` to `Open` when the failure rate is greater or equal to configurable threshold. For example when 80% of recorded call results failed. Failures are counted based on provided `ErrorMode`. For example any exception that is thrown by the operation, when using the direct, "unwrapped" API or any `Left` variant when using `runOrDropEither`. The same state change also happen when percentage of slow calls (exceeding configurable `slowCallDurationThreshold`) is equal or greater than configured threshold. For example 80% of calls took longer then 10 seconds. Those metrics are considered only when number of recorder calls is greater or equal to `minimumNumberOfCalls`, otherwise we don't change state even if failure rate is 100%. ### Parameters - `failureRateThreshold: PercentageThreshold` - percentage of recorded calls marked as failed required to switch to open state. - `slowCallThreshold: PercentageThreshold` - percentage of recorder calls marked as slow required to switch to open state. - `slowCallDurationThreshold: FiniteDuration` - duration that call has to exceed to be marked as slow. - `slidingWindow: SlidingWindow` - mechanism to determine how calls are recorded. - `minimumNumberOfCalls: Int` - minimum number of calls recorded needed for breaker to be able to switch to open state based on thresholds. - `waitDurationOpenState: FiniteDuration` - duration that CircuitBreaker will wait before switching from `Open` state to `HalfOpen`. - `halfOpenTimeoutDuration: FiniteDuration` - timeout for `HalfOpen` state after which, if not enough calls were recorder, breaker will go back to `Open` state. Zero means there is no timeout. - `numberOfCallsInHalfOpenState: Int` - number of calls recorded in `HalfOpen` state needed to calculate metrics to decide if breaker should go back to `Open` state or `Closed`. It is also maximum number of operations that can be started in this state. `SlidingWindow` variants: - `CountBased(windowSize: Int)` - This variant calculates metrics based on last n results of calls recorded. These statistics are cleared on every state change. - `TimeBased(duration: FiniteDuration)` - This variant calculates metrics of operations in the lapse of `duration` before current time. These statistics are cleared on every state change. ### Providing configuration CircuitBreaker can be configured during instantiation by providing `CircuitBreakerConfig`. ```scala import ox.supervised import ox.resilience.* import scala.concurrent.duration.* supervised: // using default config CircuitBreaker() // or CircuitBreaker(CircuitBreakerConfig.default) // custom config val config = CircuitBreakerConfig( failureRateThreshold = PercentageThreshold(50), slowCallThreshold = PercentageThreshold(50), slowCallDurationThreshold = 10.seconds, slidingWindow = SlidingWindow.CountBased(100), minimumNumberOfCalls = 20, waitDurationOpenState = 10.seconds, halfOpenTimeoutDuration = 0.millis, numberOfCallsInHalfOpenState = 10 ) // providing config for CircuitBreaker instance CircuitBreaker(config) ``` Values defined in `CircuitBreakerConfig.default`: ``` failureRateThreshold = PercentageThreshold(50) slowCallThreshold = PercentageThreshold(50) slowCallDurationThreshold = 10.seconds slidingWindow = SlidingWindow.CountBased(100) minimumNumberOfCalls = 20 waitDurationOpenState = 10.seconds, halfOpenTimeoutDuration = 0.millis, numberOfCallsInHalfOpenState = 10 ``` ## Conditions for state change ![state diagram](/_static/state-diagram-cb.svg) 1. State changes from `Closed` to `Open` after any threshold was exceeded (`failureThreshold` or `slowThreshold`) and number of recorder calls is equal or greater than `minimumNumberOfCalls`. 2. State changes from `Closed` to `HalfOpen` if any threshold was exceeded, number of recorder calls is equal or greater than `minimumNumberOfCalls` and `waitDurationOpenState` is zero. 3. State changes from `Open` to `HalfOpen` when `waitDurationOpenState` passes. 4. State changes from `HalfOpen` to `Open` if `halfOpenTimeoutDuration` passes without enough calls recorded or number of recorder calls is equal to `numberOfCallsInHalfOpenState` and any threshold was exceeded. 5. State changes from `HalfOpen` to `Closed` if `numberOfCallsInHalfOpenState` where completed before timeout and there wasn't any threshold exceeded. ```{note} CircuitBreaker uses actor internally and since actor executes on one thread this may be bottleneck. That means that calculating state change can be delayed and breaker can let few more operations to complete before opening. This can be the case with many very fast operations. ``` ## Examples ```scala import ox.UnionMode import ox.resilience.* import ox.scheduling.Schedule import ox.supervised import scala.concurrent.duration.* def directOperation: Int = ??? def eitherOperation: Either[String, Int] = ??? def unionOperation: String | Int = ??? supervised: val circuitBreaker = CircuitBreaker() // various operation definitions circuitBreaker.runOrDrop(directOperation) circuitBreaker.runOrDropEither(eitherOperation) // custom error mode circuitBreaker.runOrDropWithErrorMode(UnionMode[String])(unionOperation) // retry with circuit breaker inside retryEither(Schedule.exponentialBackoff(100.millis).maxRetries(3)){ circuitBreaker.runOrDrop(directOperation) match case Some(value) => Right(value) case None => Left("Operation dropped") } ``` # Control flow methods There are some helper methods which might be useful when writing code using ox's concurrency operators: * `forever { ... }` repeatedly evaluates the given code block forever * `repeatWhile { ... }` repeatedly evaluates the given code block, as long as it returns `true` * `repeatUntil { ... }` repeatedly evaluates the given code block, until it returns `true` * `never` blocks the current thread indefinitely, until it is interrupted * `checkInterrupt()` checks if the current thread is interrupted, and if so, throws an `InterruptedException`. Useful in compute-intensive code, which wants to cooperate in the cancellation protocol All of these are `inline` methods, imposing no runtime overhead. # Rate limiter The rate limiter mechanism allows controlling the rate at which operations are executed. It ensures that at most a certain number of operations are run concurrently within a specified time frame, preventing system overload and ensuring fair resource usage. Several rate limiting algorithms are available, either taking into account only the start time of the operation, or the entire duration of its execution. ## API Basic rate limiter usage: ```scala import ox.supervised import ox.resilience.* import scala.concurrent.duration.* supervised: val rateLimiter = RateLimiter.fixedWindowWithStartTime(2, 1.second) type T def operation: T = ??? val blockedOperation: T = rateLimiter.runBlocking(operation) val droppedOperation: Option[T] = rateLimiter.runOrDrop(operation) ``` `blockedOperation` will block the operation until the algorithm allows it to be executed. Therefore, the return type is the same as the operation. On the other hand, if the algorithm doesn't allow execution of more operations, `runOrDrop` will drop the operation returning `None` and wrapping the result in `Some` when the operation is successfully executed. A rate limiter must be created within an `Ox` [concurrency scope](../structured-concurrency/fork-join.md), as a background fork is created, to replenish the rate limiter. Once the scope ends, the rate limiter is stops as well. ## Operation definition The `operation` can be provided directly using a by-name parameter, i.e. `f: => T`. ## Configuration The configuration of a `RateLimiter` depends on an underlying algorithm that controls whether an operation can be executed or not. The following algorithms are available: - `StartTimeRateLimiterAlgorithm.FixedWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in fixed windows of `per` duration. - `StartTimeRateLimiterAlgorithm.SlidingWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in any window of time of duration `per`. - `StartTimeRateLimiterAlgorithm.LeakyBucket(maximum: Int, per: FiniteDuration)` - where `rate` is the maximum capacity of tokens available in the token bucket algorithm and one token is added each `per`. It can represent both the leaky bucket algorithm or the token bucket algorithm. - `DurationRateLimiterAlgorithm.FixedWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans fixed windows of `per` duration. Considers whole execution time of an operation. Operation spanning more than one window blocks permits in all windows that it spans. - `DurationRateLimiterAlgorithm.SlidingWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans any window of time of duration `per`. Considers whole execution time of an operation. Operation release permit after `per` passed since operation ended. ### API shorthands You can use one of the following shorthands to define a Rate Limiter with the corresponding algorithm: - `RateLimiter.fixedWindowWithStartTime(maxOperations: Int, window: FiniteDuration)` - `RateLimiter.slidingWindowWithStartTime(maxOperations: Int, window: FiniteDuration)` - `RateLimiter.leakyBucket(maxTokens: Int, refillInterval: FiniteDuration)` - `RateLimiter.fixedWindowWithDuration(maxOperations: Int, window: FiniteDuration)` - `RateLimiter.slidingWindowWithDuration(maxOperations: Int, window: FiniteDuration)` See the tests in `ox.resilience.*` for more. ## Custom rate limiter algorithms The `RateLimiterAlgorithm` employed by `RateLimiter` can be extended to implement new algorithms or modify existing ones. Its interface is modelled like that of a `Semaphore` although the underlying implementation could be different. For best compatibility with the existing interface of `RateLimiter`, methods `acquire` and `tryAcquire` should offer the same guaranties as Java's `Semaphores`. There is also method `def runOperation[T](operation: => T, permits: Int): T` for cases where considering span of execution may be necessary (see implementations in `DurationRateLimiterAlgorithm`). Additionally, there are two methods employed by the `RateLimiter` for updating its internal state automatically: - `def update(): Unit`: Updates the internal state of the rate limiter to reflect its current situation. Invoked in a background fork repeatedly, when a rate limiter is created. - `def getNextUpdate: Long`: Returns the time in nanoseconds after which a new `update` needs to be called. # Resources ## Single scoped resource Ox provides convenience inline methods to allocate, use and (uninterruptibly) release resources with a try-finally block: `use` and `useCloseable`. For example: ```scala import ox.useCloseable useCloseable(new java.io.PrintWriter("test.txt")) { writer => writer.println("Hello, world!") } ``` If a concurrency scope is available (e.g. `supervised`), or there are multiple resources to allocate, consider using the approach described below, to avoid creating an additional syntactical scope. Alternatively, you can use `useInterruptibly`, where the releasing might be interrupted, and which is equivalent to a `try`-`finally` block. ```{warning} To properly release resources when the entire application is interrupted, make sure to use [`OxApp`](oxapp.md) as the application's main entry point. ``` ## Within a concurrency scope Resources can be allocated within a concurrency scope. They will be released in reverse acquisition order, after all forks started within the scope finish (but before the scope completes). E.g.: ```scala import ox.{supervised, useInScope} case class MyResource(c: Int) def acquire(c: Int): MyResource = println(s"acquiring $c ...") MyResource(c) def release(resource: MyResource): Unit = println(s"releasing ${resource.c} ...") supervised { val resource1 = useInScope(acquire(10))(release) val resource2 = useInScope(acquire(20))(release) println(s"Using $resource1 ...") println(s"Using $resource2 ...") } ``` ### Release-only You can also register resources to be released (without acquisition logic), before the scope completes: ```scala import ox.{supervised, releaseAfterScope} case class MyResource(c: Int) def release(resource: MyResource): Unit = println(s"releasing ${resource.c} ...") supervised { val resource1 = MyResource(10) releaseAfterScope(release(resource1)) println(s"Using $resource1 ...") } ``` # Utilities In addition to concurrency, error handling and resiliency features, Ox includes some utility methods, which make writing direct-style Scala code more convenient. When possible, these are `inline` methods taking `inline` parameters, hence incurring no runtime overhead. Top-level methods: * `uninterruptible { ... }` evaluates the given code block making sure it can't be interrupted * `sleep(scala.concurrent.Duration)` blocks the current thread/fork for the given duration; same as `Thread.sleep`, but using's Scala's `Duration` * `debug(expression)` prints the code representing the expression, and the value of the expression to standard output, using `println`. Extension functions on arbitrary expressions: * `.discard` extension method evaluates the given code block and discards its result, avoiding "discarded non-unit value" warnings * `.pipe(f)` applies `f` to the value of the expression and returns the result; useful for chaining operations * `.tap(f)` applies `f` to the value of the expression and returns the original value; useful for side-effecting operations * `.tapException(Throwable => Unit)` and `.tapNonFatalException(Throwable => Unit)` allow running the provided side-effecting callback when the expression throws an exception * `.debug(label)` prints the value preceeded with the given label, and returns the original value Extension functions on `scala.concurrent.Future[T]`: * `.get(): T` blocks the current thread/fork until the future completes; returns the successful value of the future, or throws the exception, with which it failed ## Examples Debug utilities: ```scala import ox.* val x = 20 // x: Int = 20 val y = 10 // y: Int = 10 debug(x * 2 + y) // MdocApp.this.x.*(2).+(MdocApp.this.y) = 50 ``` ```scala import ox.* def transform(n: Int): Long = n * n * n transform(5).debug("transformation result") // transformation result: 125 // res1: Long = 125L ``` # Kafka flows Dependency: ```scala "com.softwaremill.ox" %% "kafka" % "1.0.5" ``` `Flow`s which read from a Kafka topic, mapping stages and drains which publish to Kafka topics are available through the `KafkaFlow`, `KafkaStage` and `KafkaDrain` objects. In all cases kafka producers and consumers can be provided: * by manually creating (and closing) an instance of a `KafkaProducer` / `KafkaConsumer` * through a `ProducerSettings` / `ConsumerSettings`, with the bootstrap servers, consumer group id, key/value serializers, etc. The lifetime is then managed by the flow operators. * through a thread-safe wrapper on a consumer (`ActorRef[KafkaConsumerWrapper[K, V]]`), for which the lifetime is bound to the current concurrency scope ## Reading from Kafka To read from a Kafka topic, use: ```scala import ox.kafka.{ConsumerSettings, KafkaFlow, ReceivedMessage} import ox.kafka.ConsumerSettings.AutoOffsetReset val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092") .autoOffsetReset(AutoOffsetReset.Earliest) val topic = "my_topic" val source = KafkaFlow.subscribe(settings, topic) .runForeach { (msg: ReceivedMessage[String, String]) => ??? } ``` ## Publishing to Kafka To publish data to a Kafka topic: ```scala import ox.flow.Flow import ox.kafka.{ProducerSettings, KafkaDrain} import ox.pipe import org.apache.kafka.clients.producer.ProducerRecord val settings = ProducerSettings.default.bootstrapServers("localhost:9092") Flow .fromIterable(List("a", "b", "c")) .map(msg => ProducerRecord[String, String]("my_topic", msg)) .pipe(KafkaDrain.runPublish(settings)) ``` To publish data as a mapping stage: ```scala import ox.flow.Flow import ox.kafka.ProducerSettings import ox.kafka.KafkaStage.* import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} val settings = ProducerSettings.default.bootstrapServers("localhost:9092") val metadatas: Flow[RecordMetadata] = Flow .fromIterable(List("a", "b", "c")) .map(msg => ProducerRecord[String, String]("my_topic", msg)) .mapPublish(settings) // process & run the metadatas flow further ``` ## Reading & publishing to Kafka with offset commits Quite often data to be published to a topic (`topic1`) is computed basing on data received from another topic (`topic2`). In such a case, it's possible to commit messages from `topic2`, after the messages to `topic1` are successfully published. In order to do so, a `Flow[SendPacket]` needs to be created. The definition of `SendPacket` is: ```scala import org.apache.kafka.clients.producer.ProducerRecord import ox.kafka.ReceivedMessage case class SendPacket[K, V]( send: List[ProducerRecord[K, V]], commit: List[ReceivedMessage[_, _]]) ``` The `send` list contains the messages to be sent (each message is a Kafka `ProducerRecord`). The `commit` list contains the messages, basing on which the data to be sent was computed. These are the received messages, as produced by a `KafkaFlow`. When committing, for each topic-partition that appears in the received messages, the maximum offset is computed. For example: ```scala import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaFlow, ProducerSettings, SendPacket} import ox.kafka.ConsumerSettings.AutoOffsetReset import ox.* import org.apache.kafka.clients.producer.ProducerRecord val consumerSettings = ConsumerSettings.default("my_group") .bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest) val producerSettings = ProducerSettings.default.bootstrapServers("localhost:9092") val sourceTopic = "source_topic" val destTopic = "dest_topic" supervised: // the consumer is shared between the subscribe & offset stages // its lifetime is bound to the current concurrency scope val consumer = consumerSettings.toThreadSafeConsumerWrapper KafkaFlow .subscribe(consumer, sourceTopic) .map(in => (in.value.toLong * 2, in)) .map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value.toString), original)) .pipe(KafkaDrain.runPublishAndCommit(producerSettings, consumer)) ``` The offsets are committed every second in a background process. ## Reading from Kafka, processing data & committing offsets Offsets can also be committed after the data has been processed, without producing any records to write to a topic. For that, we can use the `runCommit` drain, or the `mapCommit` stage, both of which work with a `Flow[CommitPacket]`: ```scala import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaFlow, CommitPacket} import ox.kafka.ConsumerSettings.AutoOffsetReset import ox.* val consumerSettings = ConsumerSettings.default("my_group") .bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest) val sourceTopic = "source_topic" supervised: // the consumer is shared between the subscribe & offset stages // its lifetime is bound to the current concurrency scope val consumer = consumerSettings.toThreadSafeConsumerWrapper KafkaFlow .subscribe(consumer, sourceTopic) .mapPar(10) { in => // process the message, e.g. send an HTTP request CommitPacket(in) } .pipe(KafkaDrain.runCommit(consumer)) ``` # Cron scheduler Dependency: ```scala "com.softwaremill.ox" %% "cron" % "1.0.5" ``` This module allows to run schedules based on cron expressions from [cron4s](https://github.com/alonsodomin/cron4s). `CronSchedule` can be used in all places that requires `Schedule` especially in [repeat](../scheduling/repeat.md) scenarios. For defining `CronExpr` see [cron4s documentation](https://www.alonsodomin.me/cron4s/userguide/index.html). ## Api The cron module exposes methods for creating `Schedule` based on `CronExpr`. ```scala import ox.scheduling.cron.* import cron4s.* repeat(CronSchedule.unsafeFromString("10-35 2,4,6 * ? * *"))(operation) ``` ## Operation definition Methods from `ox.scheduling.cron.CronSchedule` define `Schedule`, so they can be plugged into `RepeatConfig` and used with `repeat` API. ## Configuration All configuration beyond `CronExpr` is provided by the `repeat` API. If an error handling within the operation is needed, you can use a `retry` inside it (see an example below) or use `scheduled` with `CronSchedule` instead of `repeat`, which allows full customization. ## Examples ```scala import ox.UnionMode import ox.scheduling.cron.CronSchedule import scala.concurrent.duration.* import ox.resilience.{RetryConfig, retry} import ox.scheduling.* import cron4s.* def directOperation: Int = ??? def eitherOperation: Either[String, Int] = ??? def unionOperation: String | Int = ??? val cronExpr: CronExpr = Cron.unsafeParse("10-35 2,4,6 * ? * *") // various operation definitions - same syntax repeat(CronSchedule.fromCronExpr(cronExpr))(directOperation) repeatEither(CronSchedule.fromCronExpr(cronExpr))(eitherOperation) // infinite repeats with a custom strategy def customStopStrategy: Int => Boolean = ??? repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr), customStopStrategy))(directOperation) // custom error mode repeatWithErrorMode(UnionMode[String])(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(unionOperation) // repeat with retry inside repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr))) { retry(Schedule.exponentialBackoff(100.millis).maxRetries(3))(directOperation) } ``` # Inheritable MDC using Logback Dependency: ```scala "com.softwaremill.ox" %% "mdc-logback" % "1.0.5" ``` Ox provides support for setting inheritable MDC (mapped diagnostic context) values, when using the [Logback](https://logback.qos.ch) logging library. Normally, value set using `MDC.put` aren't inherited across (virtual) threads, which includes forks created in concurrency contexts. Inheritable values are especially useful e.g. when setting a correlation id in an HTTP request interceptor, or at any entrypoint to the application. Such correlation id values can be then added automatically to each log message, provided the appropriate log encoder pattern is used. To enable using inheritable MDC values, the application's code should call `InheritableMDC.init` as soon as possible. The best place would be the application's entrypoint (the `main` method). Once this is done, inheritable MDC values can be set in a scoped & structured manner using `InheritableMDC.supervisedWhere` and variants. As inheritable MDC values use a [`ForkLocal`](../structured-concurrency/fork-local.md) under the hood, their usage restrictions apply: outer concurrency scopes should not be used to create forks within the scopes. Only newly created scopes, or the provided scope can be used to create forks. That's why `supervisedWhere`, `unsupervisedWhere` and `supervisedErrorWhere` methods are provided. "Normal" MDC usage is not affected. That is, values set using `MDC.put` are not inherited, and are only available in the thread where they are set. For example: ```scala import org.slf4j.MDC import ox.fork import ox.logback.InheritableMDC InheritableMDC.supervisedWhere("a" -> "1", "b" -> "2") { MDC.put("c", "3") // not inherited fork { MDC.get("a") // "1" MDC.get("b") // "2" MDC.get("c") // null }.join() MDC.get("a") // "1" MDC.get("b") // "2" MDC.get("c") // "3" } ``` # Propagating OpenTelemetry context Dependency: ```scala "com.softwaremill.ox" %% "otel-context" % "1.0.5" ``` When using the default OpenTelemetry context-propagation mechanisms, which rely on thread-local storage, the context will not be propagated across virtual thread boundaries, e.g. when creating new forks as part of [`supervised`](../structured-concurrency/fork-join.md) scopes. This might lead to spans not being properly correlated into traces, or metrics without the appropriate context. To fix this problem, the context must be propagated whenever a new virtual thread is created. One way to achieve this is by using a custom thread factory, provided by this module - `PropagatingVirtualThreadFactory`. It can be set for the whole app when using [`OxApp`](../utils/oxapp.md), or manually through `oxThreadFactory`: ```scala import ox.* import ox.otel.context.PropagatingVirtualThreadFactory object MyApp extends OxApp: override def settings: OxApp.Settings = OxApp.Settings.Default.copy( threadFactory = Some(PropagatingVirtualThreadFactory()) ) def run(args: Vector[String])(using Ox): ExitCode = ExitCode.Success ``` # HTTP client using sttp [sttp-client](https://sttp.softwaremill.com) is a Scala HTTP client integrating with a number of Scala stacks, including direct-style backends. The integration is included in the sttp-client library and includes: * synchronous backends, which are designed to work with direct-style Scala * streaming request & response bodies, converted to/from `Flow`s via `InputStream`s * interacting with WebSockets, either using the `WebSocket` interface directly, or using `Flow`s of web socket frames * receiving SSE streams as `Flow[ServerSentEvent]` For more details refer to the sttp-client documentation, specifically: * [synchronous backends documentation](https://sttp.softwaremill.com/en/latest/backends/synchronous.html) * [usage examples](https://sttp.softwaremill.com/en/latest/examples.html), tagged with the `Direct` label # HTTP server using Tapir [Tapir](https://tapir.softwaremill.com) is a library for rapidly developing HTTP APIs. It integrates with a number of Scala stacks, including direct-style server interpreters. The integration is included in the Tapir library and includes: * exposing regular HTTP endpoints (consuming/producing JSON etc.) * streaming request & response bodies, converted to/from `Flow`s via `InputStream`s * web sockets represented as a transformation between incoming & outgoing `Flow`s of web socket frames * SSE response bodies For more details refer to the Tapir documentation, specifically: * the [netty-server-sync interpreter documentation](https://tapir.softwaremill.com/en/latest/server/netty.html) * [usage examples](https://tapir.softwaremill.com/en/latest/examples.html), tagged with the `Direct` label * [tutorials](https://tapir.softwaremill.com/en/latest/tutorials/01_hello_world.html), which mostly use the direct-style approach # Stability of modules The modules are categorized using the following levels: * **stable**: binary compatibility is guaranteed within a major version; adheres to semantic versioning * **stabilizing**: the API is mostly stable, with rare binary-incompatible changes possible in minor releases (only if necessary) * **experimental**: API can change significantly even in patch releases The major version is increased when there are binary-incompatible changes in **stable** modules. The minor version is increased when there are significant new features in **stable** modules (keeping compatibility), or binary-incompatible changes in **stabilizing** modules. The patch version is increased when there are binary-compatible changes in **stable** / **stabilizing** modules, any changes in **exeperimental** modules, or when a new module is added (e.g. a new integration). ## Main modules | Module | Level | |-----------------------|--------------| | core | stable | | flow-reactive-streams | stabilizing | | kafka | stabilizing | | mdc-logback | stabilizing | | cron | stabilizing | | otel-context | stabilizing | # Best practices While working on Ox and integrating ox into our other open-source projects, we found a couple of patterns and best practices which might be useful for anybody starting their journey with direct-style Scala and ox. ## Make scopes as small as possible If you end up using concurrency scopes such as `supervised`, make sure that their lifetime is as short as possible. In some cases it might be necessary to start a "global" scope (e.g. for application-wide, long-running tasks), but even if so, don't let the global scope leak to any other parts of your code, and isolate its usage, e.g. using [actors](../utils/actors.md). For all other tasks, create short-lived scopes, which handle a single request, message from a queue or a single job instance. ## Integrate with callback-based APIs using channels Callback-based APIs, including "reactive" ones, are by their nature non-structured, and don't play well with structured concurrency. For such cases, [channels](../streaming/channels.md) are an ideal tool. Sending or receiving to/from a channel doesn't require any context, and can be done from any thread. On the other hand, processing the data that is on the channel often involves concurrency and creating thread, which can be then done in a structured way. Note, however, that channel operations are potentially blocking. If you can't block, consider using a `select` with a default clause. ## Use `using Ox` sparingly Passing the `Ox` capability gives the method the power to start new threads - which can be a dangerous tool! The goal of structured concurrency is to localise thread creation as much as possible, and disallow methods which create a thread as an effect. `using Ox` partially circumvents this guarantee, hence use this with caution, and pay attention not to pass it through several layers of method calls, which might make the code hard to understand. ## Use flows instead of channels Transforming channels directly might lead to excessive concurrency, as each transformation typically starts a background fork, processing the data and sending it to a new channel. While this still performs well, as creating virtual threads & channels is cheap, it might incur an unnecessary overhead. Instead, you can use [flows](../streaming/flows.md) and their high-level API, which allows inserting asynchronous boundaries when necessary, but otherwise runs the subsequent processing stages on the same thread. ## Avoid returning `Fork` Accidental concurrency is often cited as a problem with using `Future`s: if you call two methods which return a `Future`, they will run concurrently, even though you might have never intended that. The same problem can occur if a method returns Ox's `Fork`. Hence, avoid returning `Fork`s from methods. Instead, model concurrency on the caller's side - if something should be run in parallel, the caller can do so, using `supervised` and `fork`, and by calling blocking methods within the forks. ## Debugging virtual threads Virtual threads are normally not visible when using tools such as `jstack` or IntelliJ's debugger. To inspect their stack traces, you'll need to create a thread dump to a file using `jcmd [pid] Thread.dump_to_file [file]`, or use Intellij's thread dump utility, when paused in the debugger. ## Dealing with uninterruptible stdin Some I/O operations, like reading from stdin, block the thread on the `read` syscall and only unblock when data becomes available or the stream is closed; such a call is uninterruptible. The problem with stdin specifically is that it can't be easily closed, making it impossible to interrupt such operations directly. This pattern extends to other similar blocking operations that behave like stdin. The solution is to delegate the blocking operation to a separate thread and use a [channel](../streaming/channels.md) for communication. This thread cannot be managed by Ox, as Ox always attempts to run cleanup on application shutdown and that means interrupting all forks. Some blocking I/O can't, however, be interrupted on the JVM and the advised way of dealing with that is to just close the resource which in turn makes read/write methods throw an `IOException`. In the case of stdin closing it is usually not what you want to do. To work around that you can sacrifice a thread and since receiving from a channel is interruptible, this makes the overall operation interruptible as well: ```scala import ox.*, channels.* import scala.io.StdIn object stdinSupport: private lazy val chan: Channel[String] = val rvChan = Channel.rendezvous[String] Thread .ofVirtual() .start(() => forever(rvChan.sendOrClosed(StdIn.readLine()).discard)) rvChan def readLineInterruptibly: String = try chan.receive() catch case iex: InterruptedException => // Handle interruption appropriately throw iex ``` This pattern allows you to use stdin (or similar blocking operations) with Ox's timeout and interruption mechanisms, such as `timeoutOption` or scope cancellation. Note that for better stdin performance, you can use `Channel.buffered` instead of a rendezvous channel, or even use `java.lang.System.in` directly and proxy raw data through the channel. Keep in mind that this solution leaks a thread that will remain blocked on stdin for the lifetime of the application. It's possible to avoid this trade-off by using libraries that employ JNI/JNA to access stdin, such as [JLine 3](https://jline.org/docs/intro), which can use raw mode with non-blocking or timeout-based reads, allowing the thread to be properly interrupted and cleaned up. # Comparing Ox & functional effects Ox is often compared and contrasted to functional effect systems. And for a good reason: libraries such as [cats-effect](https://typelevel.org/cats-effect/) or [ZIO](https://zio.dev) are often regarded as the gold standard when it comes to providing compile-time safety, error handling or composability. They represent purely functional programming, where programs are defined in terms of lazily evaluated computation descriptions. Such descriptions form monads, and need monadic operators to compose them. The specific programming styles include tagless-final (in case of cats-effect), fused monad (in case of ZIO), or algebraic effects (in case of [Kyo](https://getkyo.io/)). However, using functional effect systems has its drawbacks and brings specific tradeoffs. You can find a deeper dive on how functional effects and direct style compare in the following talks: * [Unwrapping IO: is it a path that you want to follow?](https://www.youtube.com/watch?v=qR_Od7qbacs) * [Concurrency in Scala and on the JVM](https://www.youtube.com/watch?v=6RYn6mgq77s) * [Effects: To Be Or Not To Be?](https://www.youtube.com/watch?v=sDnNjtkoUVs) Here's a quick summary of some of the differences, from the perspective of Ox. First off, what we gain when using Ox: * Simpler syntax: we can use the Scala syntax directly to work with effectful computations, as compared to the monadic syntax of functional effects. For example, composing two computations in functional effects amounts to invoking `.flatMap` on the computation descriptions, while in direct style it's just two statements one after another (separated by an invisible `;`). This makes the code more readable, with less syntactic noise. * Lower learning curve: the direct style is familiar to most programmers, while the monadic style requires understanding and getting used to. Composing programs in terms of lazily evaluated descriptions represented as values, instead of "directly" requires a mental switch, which is not always straightforward. * Better debugability: stack traces in functional effect systems are often not very useful, and carry little information helping to pinpoint the problem. In direct style, we get proper, "normal" stack traces. * No virality: "wrappers" such as `Future` or the `IO` data type are "viral": once we call a method returning an `IO`, our method should return an `IO` as well. This is not always necessary in direct style. * Ability to use built-in control flow constructs: in direct style, we can once again use `if`, `for`, `while`, `try` and other built-in control flow constructs. In functional effects, we need to use their special versions, which are functional-effect-aware. A prime example of such operator is `traverse`. What we retain: * Fearless concurrency: both approaches offer high-level APIs for concurrency, such as `par` or `race`, as well as lower level APIs, to create and manage fibers/forks manually, along with their lifecycles. * Supervision: both approaches allow supervising created fibers/forks, through some form of structured concurrency, and being notified whenever a fiber/fork fails, so that no errors go unnoticed. What we partially loose: * Principled error handling: the type signatures of methods when using functional effects are often more precise, when it comes to the type of errors, that the computation might end up with. In case of cats-effect, the presence of `IO` signals that the computation might involve blocking operations, which might throw exceptions. In case of ZIO, error handling is especially well-designed, and we get full information on the type of errors. Ox proposes using `Either`s to represent application-level errors, however there's no tracking of exceptions, blocking or IO operations (at least yet - that's an area we'd like to improve). * Dedicated resource data type: while in Ox we do have various methods which e.g. attach resources to scopes, it's still possible to use resources in an unsafe way - the compiler won't warn us about that. In functional effects, resources are typically represented using a dedicated data type, which ensures that they are used safely. Of course, this is only useful if proper integration libraries are provided which expose resources using the appropriate data types, as otherwise without proper discipline it's still possible to use resources unsafely. * Principled interruptions: in Ox and direct style on the JVM in general, we rely on the JVM interruption mechanism, which relies on injecting `IntereruptedException`s. Poorly written libraries, or simply bugs in our code, might intercept such exceptions, and e.g. log them, instead of rethrowing. Using `NonFatal` instead of catch-alls is a way to avoid these problems, but again, this relies on discipline. On the other hand, in functional effect systems, interruptions are signalled using an out-of-bound mechanism, which can't be intercepted and ignored. * The representation of computations is no longer uniform. In functional effects, all computations are always represented as lazily evaluated descriptions. In direct style, and this includes Ox as well, sometimes we need to pass lazily evaluated code to some method which needs to control the passed code's evaluation. Hence, the eager/lazy distinction needs to be done manually. What we loose: * Referential transparency (RT) and "fearless refactoring": since computations are no longer represented as values, some refactorings are no longer safe (e.g. we cannot extract computations to `val`s). However, solving this problem might amount to using `def`s for side-effecting computations. The exact benefits of RT are still being discussed, with various ideas of providing similar guarantees in direct style (e.g. by tracking suspensions). To finish off, it's an ongoing research process to understand the fundamental benefits that functional effect systems bring, and to pinpoint which of their characteristics are essential. We're constantly on the lookout for use-cases, which can only be written elegantly or safely using one style, and not the other. If you do have such a use-case, please [share on the forum](https://softwaremill.community)! # Comparing Ox & Gears The [gears](https://github.com/lampepfl/gears) project is an experimental multi-platform library covering direct style concurrency in Scala. The scope of both Ox and Gears is similar in places, however there are also some notable differences, listed below. Originally [posted on Reddit](https://www.reddit.com/r/scala/comments/1cdfaki/comment/l1c0pcn/): 1. The fundamental difference is in the timeframes and perspectives of the projects. With Ox, we are trying to provide people with the tools necessary to write direct-style Scala now (2024). On the other hand, Gears is more of a research project, and coupled to Project Caprese, which will still run until 2028. That's not to say that Gears won't have a stable release before then - we're not aware of Gears's release plan - however the development goals of both projects seem different. Gears is more in an exploratory phase, while in Ox we are looking at a shorter time-to-market. 2. Ox only targets the JVM 21+, while Gears targets JVM 21+ and Native. While we don't rule out adding Native support, if it will be possible, it's not our immediate goal, because of (1). 3. This also influences features such as capture checking. It seems Gears will want to use the capture checker pretty early. We're hoping to do the same in Ox at some point, but that's only after the capture checker is relatively complete, and available in a stable (maybe LTS?) Scala release. So this might still take some time. 4. The scope of Ox is a bit wider than just concurrency: we're also looking at resiliency and general direct style utilities. One could of course debate, is the specialised-library approach taken by Gears better, or the more broad one taken by Ox. But I don't think there's a universal answer to that. 5. The programming styles that both libraries offer are slightly different as well. In Gears, most of the provided functionalities operate on the level of Gears-`Future`s. While in Ox, you often operate on thunks `=> T` or `() => T`. The Gears approach is more general, however the Ox one is more "direct" for the common case. 6. That might be considered a small detail, but we think naming is important. Gears is centered around a `Future` abstraction, which is a distinct type from `scala.concurrent.Future`. This might create unnecessary confusion, plus in our opinion it's worthy to distinguish between [promise-like futures and thread-like futures](https://softwaremill.com/two-types-of-futures/). That's why in Ox we've got `Fork`s instead (which are a thread-like-future data type). However, you don't use that type often, because of (5) above. 7. Another rather fundamental difference is in our approach to error handling. In Ox, when you create a `supervised` scope with `fork`s inside, if any of the fork fails, the whole scope fails and re-throws this exception (a variant of let-it-crash). In Gears, the default is to have failed futures, and the errors are only discovered when `.join`ing them. Both approaches have their merits, however the Ox one, where you have to explicitly create unsupervised forks (using `forkUnsupervised`), seems safer: your code might crash, but you won't miss an error. 8. Speaking of error handling, Ox provides support for various `ErrorMode`s, that is situations where you want to represent errors-as-values (in addition to exceptions). We [propose a specific way](../basics/error-handling.md) to represent such application/logical errors (using `Either`s), with the built-in concurrency operators often having `Either`-variants, and by providing a boundary-break implementation for `Either`s. Gears might be getting something similar, so this might stop being an actual difference. 9. In Gears, there are two capabilities: `Async` and `Async.Spawn`. The first one represents a capability to suspend, the second - to fork. In Ox, we only have the `Ox` capability, which corresponds to `Async.Spawn`. You don't need a capability to suspend. This might be seen as a feature, or a bug. On one hand, it might be useful to know that somewhere down the call chain, your code will want to suspend, and more importantly, to be interrupted. That's the kind of information you get with `Async` in Gears. On the other hand, our worry is that `using Async` will be the new `implicit ec: ExecutionContext`. We're not ruling out adding an `Async`-like capability to Ox, it might turn out to be the right thing to do, but we'd still like to explore some other options (as hinted in the [Scalar talk](https://www.youtube.com/watch?v=C3j4AsFcxmM), near the end) 10. Ox's Kotlin-inspired `Channel` & `select` [implementation](https://github.com/softwaremill/jox) is less flexible than the one in Gears (you can't nest select's that easily), however it might be more performant. Initial benchmarks of a WebSocket server using Java 21 & Ox streaming turned out better or matching the asynchronous implementations. # Dictionary How we use various terms throughout the codebase and the documentation (or at least try to): Scopes: * **concurrency scope**: either `supervised` (default), `supervisedError` (permitting application errors), or `unsupervised` * scope **body**: the code block passed to a concurrency scope (the `supervised`, `supervisedError` or `unsupervised` method) Types of forks: * supervised / unsupervised * daemon / user * optionally, recognizing application errors Fork lifecycle: * within scopes, asynchronously running **forks** can be **started** * after being started a fork is **running** * then, forks **complete**: either a fork **succeeds** with a value, or a fork **fails** with an exception * external **cancellation** (`Fork.cancel()`) interrupts the fork and waits until it completes; interruption uses JVM's mechanism of injecting an `InterruptedException` * forks are **supervised** if they are run in a `supervised` scope, and not explicitly unsupervised (that is, started using `forkUnsupervised` or `forkCancellable`) Scope lifecycle: * a scope **ends**: when unsupervised, the scope's body is entirely evaluated; when supervised, all user (non-daemon) & supervised forks complete successfully, or at least one user/daemon supervised fork fails, or an application error is reported. When the scope ends, all forks that are still running are cancelled * scope **completes**, once all forks complete and finalizers are run; then, the `supervised`, `supervisedError` or `unsupervised` method returns. Errors: * fork **failure**: when a fork fails with an exception * **application error**: forks might successfully complete with values which are considered application-level errors; such values are reported to the enclosing scope and cause the scope to end Other: * **computation combinator**: a method which takes user-provided functions and manages their execution, e.g. using concurrency, interruption, and appropriately handling errors; examples include `par`, `race`, `retry`, `timeout` Channels: * **values** can be **sent** to a channel, or **received** from a channel Flows: * when **run**, a flow **emits** **elements** # Blogs, videos, ... ## Blogs * [Prototype Loom-based concurrency API for Scala](https://softwaremill.com/prototype-loom-based-concurrency-api-for-scala/) * [Go-like channels using project Loom and Scala](https://softwaremill.com/go-like-channels-using-project-loom-and-scala/) * [Two types of futures](https://softwaremill.com/two-types-of-futures/) * [Supervision, Kafka and Java 21: what’s new in Ox](https://softwaremill.com/supervision-kafka-and-java-21-whats-new-in-ox/) * [Designing a (yet another) retry API](https://softwaremill.com/designing-a-yet-another-retry-api/) * [Handling errors in direct-style Scala](https://softwaremill.com/handling-errors-in-direct-style-scala/) * [Direct-style concurrent streaming](https://softwaremill.com/direct-style-concurrent-streaming/) ## Videos Coming up! # Performance Some performance tests have been done around channels, see: * [Limits of Loom's performance](https://softwaremill.com/limits-of-looms-performance/) * [Go-like selects using jox channels in Java](https://softwaremill.com/go-like-selects-using-jox-channels-in-java/) # 1. Error propagation in channels Date: 2023-10-30 ## Context What should happen when an error is encountered when processing channel elements? Should it be propagated downstream or re-thrown? ## Decision We chose to only propagate the errors downstream, so that they are eventually thrown when the source is discharged. Won't this design cause upstream channels / sources to operate despite the consumer being gone (because of the exception)? It depends on two factors: - whether there are any forks running in parallel to the failed one, - whether you only signal the exception downstream, or also choose to re-throw it. If there's only a single fork running at a time, it would terminate processing anyway, so it's enough to signal the exception to the downstream. If there are multiple forks running in parallel, there are two possible scenarios: 1. If you choose to re-throw the exception, it should cause the containing scope to finish (or a supervised fork to fail), cancelling any forks that are operating in the background. Any unused channels can then be garbage-collected. 2. If you choose not to re-throw, the forks running in parallel would be allowed to complete normally (unless the containing scope is finished for another reason). Internally, for the built-in `Source` operators, we took the latter approach, i.e. we chose not to re-throw and let the parallel forks complete normally. However, we keep in mind that they might not be able to send to downstream channel anymore - since the downstream might already be closed by the failing fork. ### Example Let's have a look at the error handling in `Source.mapParUnordered` to demonstrate our approach. This operator applies a mapping function to a given number of elements in parallel, and is implemented as follows: ```scala def mapParUnordered[U](parallelism: Int)(f: T => U)(using Ox, BufferCapacity): Source[U] = val c = BufferCapacity.newChannel[U] val s = new Semaphore(parallelism) forkDaemon { supervised { // (1) repeatWhile { s.acquire() receive() match case ChannelClosed.Done => false case ChannelClosed.Error(r) => // (2) c.error(r) false case t: T @unchecked => fork { // (3) try c.send(f(t)) // (4) s.release() catch case e: Exception => c.error(t) // (5) } true } } c.done() } c ``` It first creates a `supervised` scope (1), i.e. one that only completes (on the happy path) when all non-daemon supervised forks complete. The mapping function `f` is then run in parallel using non-daemon `fork`s (3). Let's assume an input `Source` with 4 elements, and `parallelism` set to 2: ```scala val input: Source[Int] = Source.fromValues(1, 2, 3, 4) def f(i: Int): Int = if () val result: Source[Int] = input.mapParUnordered(2)(f) ``` Let's also assume that the mapping function `f` is an identity with a fixed delay, but it's going to fail immediately (by throwing an exception) when it processes the third element. In this scenario, the first 2-element batch would successfully process elements `1` and `2`, and emit them downstream (i.e. to the `result` source). Then the forks processing of `3` and `4` would start in parallel. While `4` would still be processed (due to the delay in `f`), the fork processing `3` would immediately throw an exception, which would be caught (5). Consequently, the downstream channel `c` would be closed with an error, but the fork processing `4` would remain running. Whenever the fork processing `4` is done executing `f`, its attempt to `c.send` (4) will fail silently - due to `c` being already closed. Eventually, no results from the second batch would be send downstream. The sequence of events would be similar if it was the upstream (rather than `f`) that failed, i.e. when `receive()` resulted in an error (2). # 2. Retries Date: 2023-11-30 ## Context How should the [retries API](../scheduling/retries.md) be implemented in terms of: - developer friendliness, - supported ways of representing the operation under retry, - possibly infinite retries. ## Decision We're using a single, unified syntax to retry and operation: ```scala retry(operation)(policy) ``` so that the developers don't need to wonder which variant to use. The operation can be a function returning a result directly, or wrapped in a `Try` or `Either`. Therefore, there are three overloaded variants of the `retry` function. For possibly infinite retries, we're using tail recursion to be stack safe. This comes at a cost of some code duplication in the retry logic, but is still more readable and easier to follow that a `while` loop with `var`s for passing the state. # 3. Why source operators do not throw Date: 2024-01-25 ## Context Revisiting ADR #1, what should happen when an error is encountered when processing channel elements? Should it be propagated downstream or re-thrown? ## Decision In addition to what's mentioned in ADR #1, operators don't throw, but propagate, because we want to allow throw-free coding style. When errors are propagated, on error every daemon operator thread shuts down, and we end the scope gracefully. Additionally, we assume that data only flows downstream - and this includes errors. # 4. Channels: safe/unsafe Operations Date: 2024-02-28 ## Context Channel operations such as `send`, `receive`, `select`, `close` etc. might fail because a channel is closed. How should this be signalled to the user? ## Decision We decided to have two variants of the methods: * default: `send`, `receive` etc., which throw an exception, when the channel is closed * safe: `sendSafe`, `receiveSafe` etc., which return a `ChannelClosed` value, when the channel is closed The "safe" variants are more performant: no stack trace is created, when the channel is closed. They are used by all channel combinators (such as `map`, `filter` etc.), to detect and propagate the errors downstream. ### Why not `Either` or `Try`? To avoid allocations on each operation (e.g. receive). Channels might be on the "hot path" and they might be important for performance. Union types provide a nice alternative here. Even with `Either`, though, if e.g. `send` had a signature `Either[ChannelClosed, Unit]`, discarding the result would at most be a warning (not in all cases), so potentially an error might go unnoticed. ### Why is the default to throw? Let's consider `send`. If the default would be `send(t: T): ChannelClosed | Unit`, with an additional exception-throwing variant `sendUnsafe(t: T): Unit`, then the API would be quite surprising. Coming to the library as a new user, they could just call send / receive. The compiler might warn them in some cases that they discard the non-unit result of `send`, but (a) would they pay attention to those warnings, and (b) would they get them in the first place (this type of compiler warning isn't detected in 100% o cases). In other words - it would be quite easy to mistakenly discard the results of `send`, so a default which guards against that (by throwing exceptions) is better, and the "safe" can always be used intentionally version if that's what's needed. ### Update 17/04/2024 The `...Safe` operations got renamed to `...OrClosed` or `...OrError`, as they can still throw `InterruptedException`s. # 5. Application errors Date: 2024-03-05 ## Context In some cases, it's useful to treat some return values as errors, which should cause the enclosing scope to end. ## Decision For computation combinators, which include `par`, `race` and `supervised`, we decided to introduce the concept of application errors. These are values of a shape defined by an `ErrorMode`, which are specially treated by Ox - if such a value represents an error, the enclosing scope ends. Some design limitations include: * we want normal scopes to remain unchanged * methods requiring a concurrency scope (that is, `using Ox`) should be callable from the new scope * all forks that might report application errors, must be constrained to return the same type of application errors * computation combinators, such as `par`, should have a single implmentation both when using application errors and exceptions only Taking this into account, we separate the `Ox` capability, which allows starting forks, and `OxError`, which additionally allows reporting application errors. An inheritance hierarchy, `OxError <: Ox` ensures that we can call methods requiring the `Ox` capability if `OxError` is available, but not the other way round. Finally, introducing a special `forkError` method allows us to require that it is run within a `supervisedError` scope and that it must return a value of the correct shape. # 6. Actors Date: 2024-03-26 ## Context Motivated by the Kafka integration, it's often useful to call methods on an object with guaranteed serialisation of access, just as it happens in actors, which protect their mutable state. ## Decision The current implementation of actors is very simple, and allows sending any thunk to be executed given the current actor's state. This forces the internal state to be mutable. Such an approach was chosen because of its simplicity, and how well it fits the motivating Kafka use-case, but it might need revisiting once more use-cases arise. An alternative implementation would force each actor invocation to return the updated actor state, in addition to the value that should be returned to the caller (if any). However, it's not clear then how to combine this with the type-safe syntax of invoking actors (or "sending messages" to them). For each method `T.m(args ...): U` that is accessible via `ActorRef[T]`, the actor itself would need to have a `TA.ma(args ...): S => (U, S)` method, where `S` is the actor's state. The fact that the `T` and `TA` types "match" in this way could be probably verified using a macro, but would be harder to implement by users and more complex. While the idea is that the thunks passed to `ActorRef.ask` and `ActorRef.tell` should invoked a single method on the actor's interface (similar to "sending a message"), this is not actually verified. As an improvement, these methods could be changed to a macro that would verify the shape of the lambda passed to them: ```scala def doAsk[T, U: Type](f: Expr[T => U], c: Expr[Sink[MethodInvocation]])(using Quotes): Expr[U] = import quotes.reflect.* '{ val cf = new CompletableFuture[U]() val onResult = (v: Any) => { val _ = cf.complete(v.asInstanceOf[U]) } val onException = (e: Throwable) => { val _ = cf.completeExceptionally(e) } $c.send(${ f.asTerm match { case Inlined(_, _, Block(List(DefDef(_, _, _, Some(Apply(Select(_, method), parameters)))), _)) => '{ MethodInvocation(${ Expr(method) }, ${ Expr.ofList(parameters.map(_.asExpr)) }, onResult, onException) } case _ => report.errorAndAbort(s"Expected a method call in the form _.someMethod(param1, param2), but got: ${f.show}") } }) cf.get() } ``` Another limitation of this implementation is that it's not possible to schedule messages to self, as using the actor's `ActorRef` from within the actor's implementation can easily lead to a deadlock (always, if the invocation would be an `ask`, and with some probability if it would be a `tell` - when the actor's channel would become full). Finally, error handling might be implemented differently - so that each exception thrown by the actor's methods would be propagated to the actor's enclosing scope, and would close the actor's channel. While this is the only possibility in case of `.tell`, as otherwise the exception would go unnoticed, in case of `.ask` only fata exceptions are propagated this way. Non-fatal ones are propagated to the caller, keeping with the original assumption that using an actor should be as close as possible to calling the method directly (which would simply propagate the exception). # 7. Supervised & unsupervised scopes Date: 2024-04-17 ## Context Originally, Ox had only `scoped` which created non-supervised scopes, that is errors were only discovered via explicit joining. This was later changed by introducing `supervised` and `unsupervised` scopes, where the former would end immediately when any fork failed, and the latter would not. However, `supervised` scopes have an overhead: they create an additional fork, in which the scope's main body is run. Is it possible to avoid this extra fork? ## Decision In short: no. An alternate design would be to store the thread, that created the scope as part of the supervisor, and when any exception occurs, interrupt that thread so that it would discover the exception. However, this would be prone to interruption-races with external interruptions of that main thread. Even if we included an additional flag, specifying if the interruption happened because the scope ends, it would still be possible for an external interrupt to go unnoticed (if it happened at the same time, as the internal one). Even though unlikely, such a design would be fragile, hence we are keeping the current implementation. ## Consequences To make our design more type-safe, we split the `Ox` capability into `OxUnsupervised` (allowing only unsupervised forks), and `Ox`. # 8. Retries Date: 2024-07-09 ## Context How should the [retries](../scheduling/retries.md) and [repeat](../scheduling/repeat.md) APIs have the common implementation. ## Decision We're introducing [scheduled](../scheduling/scheduled.md) as a common API for both retries and repeats. In addition, `Schedule` trait and its implementations are decoupled from the retry DSL, so that they can be used for repeating as well. `retry` API remains unchanged, but it now uses `scheduled` underneath. Also, `repeat` functions has been added as a sugar for `scheduled` with DSL focused on repeating. The main difference between `retry` and `repeat` is about interpretation of the duration provided by the `Schedule` (delay vs interval).