Handling closed channels
By default, Sink.send and Source.receive methods will throw a ChannelClosedException, if the channel is already
closed:
enum ChannelClosedException(cause: Option[Throwable]) extends Exception(cause.orNull):
case Error(cause: Throwable) extends ChannelClosedException(Some(cause))
case Done() extends ChannelClosedException(None)
Alternatively, you can call Sink.sendSafe or Source.receiveSafe, which return union types:
trait Source[+T]:
def receive(): T
def receiveSafe(): T | ChannelClosed
trait Sink[-T]:
def send(value: T): Unit
def sendSafe(value: T): Unit | ChannelClosed
def done(): Unit
def doneSafe(): Unit | ChannelClosed
def error(cause: Throwable): Unit
def errorSafe(cause: Throwable): Unit | ChannelClosed
sealed trait ChannelClosed
object ChannelClosed:
case class Error(reason: Option[Exception]) extends ChannelClosed
case object Done extends ChannelClosed
That is, the result of a safe operation might be a value, or information that the channel is closed.
Using extensions methods from ChannelClosedUnion it’s possible to convert such union types to Eithers, Trys or
exceptions, as well as map over such results.