Channels

A channel is like a queue (values can be sent/received), but additionally channels support:

  • completion (a source can be done)

  • downstream error propagation

  • selecting exactly one channel clause to complete, where clauses include send and receive operations

Creating a channel is a light-weight operation:

import ox.channels.*
val c = Channel.bufferedDefault[String]

This uses the default buffer size (16). It’s also possible to create channels with other buffer sizes, as well as rendezvous or unlimited channels:

import ox.channels.*
val c1 = Channel.rendezvous[String]
val c2 = Channel.buffered[String](5)
val c3 = Channel.unlimited[String]

In rendezvous channels, a sender and receiver must “meet” to exchange a value. Hence, .send always blocks, unless there’s another thread waiting on a .receive. In buffered channels, .send only blocks when the buffer is full. In an unlimited channel, sending never blocks.

Channels implement two traits: Source and Sink.

Sinks

Data can be sent to a channel using Sink.send. Once no more data items are available, completion can be signalled downstream using Sink.done. If there’s an error when producing data, this can be signalled using Sink.error:

import ox.{fork, supervised}
import ox.channels.*

val c = Channel.rendezvous[String]
supervised:
  fork:
    c.send("Hello")
    c.send("World")
    c.done()

  // TODO: receive

.send blocks the thread, hence usually channels are shared across forks to communicate data between them. As Ox is designed to work with Java 21+ and Virtual Threads, blocking is a cheap operation that might be done frequently.

Sources

A source can be used to receive elements from a channel.

trait Source[+T]:
  def receive(): T

Same as .send, the .receive method might block the current thread.

Creating sources

Sources can be created by instantiating a new channel. There are also some basic factory methods on the Source companion object. Finally, a flow can be run to a channel if needed, e.g.:

import ox.supervised
import ox.channels.Source
import ox.flow.Flow

import scala.concurrent.duration.*

supervised:
  Source.fromValues(1, 2, 3)
  Flow.tick(1.second, "x").runToChannel()
  Flow.iterate(0)(_ + 1).runToChannel() // natural numbers

Typically, for each source created as shown above a fork is started, which sends the elements to the channel when capacity is available. If the enclosing supervised scope ends, each such fork is cancelled.

Handling closed channels

By default, Sink.send and Source.receive methods will throw a ChannelClosedException, if the channel is already closed:

enum ChannelClosedException(cause: Option[Throwable]) extends Exception(cause.orNull):
  case Error(cause: Throwable) extends ChannelClosedException(Some(cause))
  case Done() extends ChannelClosedException(None)

Alternatively, you can call Sink.sendSafe or Source.receiveSafe, which return union types:

trait Source[+T]:
  def receive(): T
  def receiveSafe(): T | ChannelClosed

trait Sink[-T]:
  def send(value: T): Unit
  def sendSafe(value: T): Unit | ChannelClosed
  def done(): Unit
  def doneSafe(): Unit | ChannelClosed
  def error(cause: Throwable): Unit
  def errorSafe(cause: Throwable): Unit | ChannelClosed

sealed trait ChannelClosed
object ChannelClosed:
  case class Error(reason: Option[Exception]) extends ChannelClosed
  case object Done extends ChannelClosed

That is, the result of a safe operation might be a value, or information that the channel is closed.

Using extensions methods from ChannelClosedUnion it’s possible to convert such union types to Eithers, Trys or exceptions, as well as map over such results.