Transforming channels

Transforming eagerly

Sources can be transformed by receiving values, manipulating them and sending to other channels - this provides the highest flexibility and allows creating arbitrary channel topologies.

Some basic channel-transformaing operations are available as methods on Source. For example:

import ox.supervised
import ox.channels.{Channel, Source}

supervised:
  val c = Channel.rendezvous[String]
  val c2: Source[Int] = c.map(s => s.length())

The .map needs to be run within a scope, as it starts a new virtual thread (using fork), which:

  • immediately starts receiving values from the given source

  • applies the given function

  • sends the result to the new channel

The new channel is returned to the user as the return value of .map.

To run multiple transformations within one virtual thread / fork, the .transform method is available:

import ox.supervised
import ox.channels.Source

supervised:
  Source.fromIterable(1 to 1000)
    .transform(_.filter(_ % 2 == 0).map(_ + 1).take(10)) // take the 10 first even numbers, incremented by 1
    .foreach(n => println(n.toString))

Note

For more advanced transformation options, use flows.

Capacity of transformation stages

Most source and some flow transformation methods create new channels, on which the transformed values are produced. The capacity of these channels by default is 16 (buffered). This can be overridden by providing BufferCapacity given, e.g.:

(v: Source[Int]).map(_ + 1)(using BufferCapacity(10))

Transforming lazily

A limited number of transformations can be applied to a source without creating a new channel and a new fork, which computes the transformation. These include: .mapAsView, .filterAsView and .collectAsView.

For example:

import ox.channels.{Channel, Source}

val c = Channel.rendezvous[String]
val c2: Source[Int] = c.mapAsView(s => s.length())

The mapping function (s => s.length()) will only be invoked when the source is consumed (using .receive() or select), on the consumer’s thread. This is in contrast to .map, where the mapping function is invoked on a separate fork.

Hence, creating views doesn’t need to be run within a scope, and creating the view itself doesn’t consume any elements from the source on which it is run.

Discharging channels

Values of a source can be discharged using methods such as .foreach, .toList, .pipeTo or .drain:

import ox.supervised
import ox.channels.Source

supervised:
  val s = Source.fromValues(1, 2, 3)
  s.toList: List[Int] // List(1, 2, 3)

These methods are blocking, as they drain the channel until no more values are available (when the channel is done).

Closed channels (done / error)

If the channel encounters an error, the discharging method will throws a ChannelClosedException.Error. Similarly as with send and receive, there’s a safe variant for each discharing method, which returns a union type, e.g.:

import ox.supervised
import ox.channels.{ChannelClosed, Source}

supervised:
  val s = Source.fromValues(1, 2, 3)
  s.toList: List[Int] | ChannelClosed.Error // List(1, 2, 3)