A tour of ox

Run two computations in parallel:

def computation1: Int = { sleep(2.seconds); 1 }
def computation2: String = { sleep(1.second); "2" }
val result1: (Int, String) = par(computation1, computation2)
// (1, "2")

Timeout a computation:

def computation3: Int = { sleep(2.seconds); 1 }
val result2: Either[Throwable, Int] = either.catching(timeout(1.second)(computation3))
// `timeout` only completes once the loosing branch is interrupted & done

Race two computations:

def computation4: Int = { sleep(2.seconds); 1 }
def computation5: Int = { sleep(1.second); 2 }
val result3: Int = raceSuccess(computation4, computation5)
// as before, the loosing branch is interrupted & awaited before returning a result

Structured concurrency & supervision:

// equivalent of par
supervised {
  val f1 = fork { sleep(2.seconds); 1 }
  val f2 = fork { sleep(1.second); 2 }
  (f1.join(), f2.join())

Error handling within a structured concurrency scope:

supervised {

    throw new RuntimeException("boom!")

Retry a computation:

def computationR: Int = ???
retry(RetryConfig.backoff(3, 100.millis, 5.minutes, Jitter.Equal))(computationR)

Repeat a computation:

def computationR: Int = ???

Rate limit computations:

  val rateLimiter = RateLimiter.fixedWindowWithStartTime(2, 1.second)
  rateLimiter.runBlocking({ /* ... */ })

Allocate a resource in a scope:

supervised {
  val writer = useCloseableInScope(new java.io.PrintWriter("test.txt"))
  // ... use writer ...
} // writer is closed when the scope ends (successfully or with an error)

Create an app which shuts down cleanly when interrupted with SIGINT/SIGTERM:

object MyApp extends OxApp:
  def run(args: Vector[String])(using Ox): ExitCode =
    // ... your app's code ...
    // might use fork {} to create top-level background threads

Simple type-safe actors:

class Stateful { def increment(delta: Int): Int = ??? }

  val ref = Actor.create(new Stateful)
  // ref can be shared across forks, but only within the concurrency scope

Create a simple flow & transform using a functional API:

Flow.iterate(0)(_ + 1) // natural numbers
  .filter(_ % 2 == 0)
  .map(_ + 1)
  // compute the running total
  .mapStateful(0) { (state, value) =>
    val newState = state + value
    (newState, newState)
  .runForeach(n => println(n.toString))

Create flows which perform I/O and manage concurrency:

def sendHttpRequest(entry: String): Unit = ???

Merge two flows, properly handling the failure of either branches:

val f1 = Flow.tick(123.millis, "left")
val f2 = Flow.tick(312.millis, "right")

Integrate flow with other components using an imperative API:

def readNextBatch(): List[String] = ???
Flow.usingEmit { emit =>

Use completable high-performance channels for inter-fork communication within concurrency scopes:

val c = Channel.buffered[String](8)

Select from Go-like channels:

val c = Channel.rendezvous[Int]
val d = Channel.rendezvous[Int]
select(c.sendClause(10), d.receiveClause)

Unwrap eithers and combine errors in a union type:

val v1: Either[Int, String] = ???
val v2: Either[Long, String] = ???

val result: Either[Int | Long, String] = either:
  v1.ok() ++ v2.ok()

Pipe & tap values to functions to use the dot-syntax:

def compute: Int = ???
def computeMore(v: Int): Long = ???
  .pipe(2 * _)

Dive into the specific documentation sections for more details, variants and functionalities!