Handling closed channels

By default, Sink.send and Source.receive methods will throw a ChannelClosedException, if the channel is already closed:

enum ChannelClosedException(cause: Option[Throwable]) extends Exception(cause.orNull):
  case Error(cause: Throwable) extends ChannelClosedException(Some(cause))
  case Done() extends ChannelClosedException(None)

Alternatively, you can call Sink.sendSafe or Source.receiveSafe, which return union types:

trait Source[+T]:
  def receive(): T
  def receiveSafe(): T | ChannelClosed

trait Sink[-T]:
  def send(value: T): Unit
  def sendSafe(value: T): Unit | ChannelClosed
  def done(): Unit
  def doneSafe(): Unit | ChannelClosed
  def error(cause: Throwable): Unit
  def errorSafe(cause: Throwable): Unit | ChannelClosed

sealed trait ChannelClosed
object ChannelClosed:
  case class Error(reason: Option[Exception]) extends ChannelClosed
  case object Done extends ChannelClosed

That is, the result of a safe operation might be a value, or information that the channel is closed.

Using extensions methods from ChannelClosedUnion it’s possible to convert such union types to Eithers, Trys or exceptions, as well as map over such results.