ZIO is an awesome library to deal with the complicated part of effects, most notably concurrency and asynchronicity.
Sometimes, you want to create a “batch” like process, i.e. a fiber running in background and never terminating, which does some effect based on a trigger, like a period of time or reception of an external event. And of course you want to test its semantic. Fortunately, ZIO
provides a test environment with a test clock that you can adjust for your needs. Here, it becomes apparent why the clock effect is explicit in ZIO
: you can control it!
import zio._
import zio.clock._
import zio.duration._
import zio.test.environment._
import java.util.concurrent.TimeUnit
object Batch {
/*
* A batch is a forever looping running process (`forever`) forked in background
* and properly detached from its parent fiber (`forkDaemon`) that execute an
* effect each time a trigger is yielded.
*/
def createBatch(effect: ZIO[Clock, Nothing, Unit]): URIO[Clock, Fiber[Nothing, Nothing]] = {
// let's say that in our case, the trigger is just a "some time passed"
val trigger = UIO.unit.delay(5.minutes)
// the whole batch
(trigger *> effect).forever.forkDaemon
}
}
ZIO
test environment:
ZIO.accessM[Clock with TestClock] { testClock =>
def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
// now use testClock or adjust
}.provideLayer(testEnvironment)
Ref
:
/*
* Create a batch whose effect it to store current time in a `Ref` and write something out
*/
def demoBatch(r: Ref[List[Long]]) = Batch.createBatch(
// access what is needed from environment
ZIO.accessM[Clock]{ c => c.get.currentTime(TimeUnit.MINUTES).flatMap(t =>
// actual effect
UIO.effectTotal(println(s"done at $t")) *> r.update(t :: _)
) }
)
OK, so we have a batch that will record its triggering time in a Ref
at each of its executions
1. val prog1 = ZIO.accessM[Clock with TestClock] { testClock =>
2. def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
3. for {
4. r <- Ref.make(List.empty[Long])
5. _ <- demoBatch(r)
6. _ <- adjust(12.minutes)
7. l <- r.get
8. } yield l
9. }.provideLayer(testEnvironment)
Great! In a couple of lines, we are ready to test our batch by injecting a test clock and adjusting time to what we need.
def main(args: Array[String]): Unit = {
val l = Runtime.default.unsafeRun(prog1)
assert(l == 10 :: 5 :: Nil, s"Got unexpected execution list: ${l}")
}
done at 5
done at 10
Exception in thread "main" java.lang.AssertionError: assertion failed: Got unexpected list: List()
at scala.Predef$.assert(Predef.scala:282)
at MainTest1$.main(MainClock.scala:55)
at MainTest1.main(MainClock.scala)
...
Process finished with exit code 1
You got an error.
So, what’s going on? We see both prints, but the list is empty.
Actually, what happens is a race condition. Because concurrency is hard, and that’s why you let that to others.
The race condition is between the time it takes for adjust
to tell fibers “hey, you need to do what you should when time is passing” and our batch fiber to actually do what it should on the one hand, and the r.get
on the other hand. r.get
is really fast. More than fiber business.
val prog2 = ZIO.accessM[Clock with TestClock] { testClock =>
def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
for {
r <- Ref.make(List.empty[Long])
_ <- demoBatch(r)
_ <- adjust(12.minutes)
_ <- UIO.effectTotal(Thread.sleep(5000))
l <- r.get
} yield l
}.provideLayer(testEnvironment)
prog2
, you get the expected result: done at 5
done at 10
Process finished with exit code 0
Success! Really, success?
No, because you introduced non determinism in your test. It may happen that sometime, due to execution constraints like load, background fiber takes more time to adjust. Or, like in that case, you massively overestimated the sleep
duration and so your tests will take much more time than needed, most of that time spent sleeping.
So, what’s the correct solution? As always with concurrency problems, the correct answer is to encode your protocol by forcing a synchronisation point for each execution of your batch. This is tedious, since it means that you will need to clearly count each of them, but it’s the only way toward determinism.
To force synchronization, the simplest way is to use a Queue
and call one take
for each offer
.
def demoBatch(q: Queue[Long]) = Batch.createBatch(
// access what is needed from environment
ZIO.accessM[Clock]{ c => c.get.currentTime(TimeUnit.MINUTES).flatMap(t =>
// actual effect
UIO.effectTotal(println(s"done at $t")) *> q.offer(t).unit
) }
)
val prog3 = ZIO.accessM[Clock with TestClock] { testClock =>
def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
for {
q <- Queue.unbounded[Long]
_ <- demoBatch(q)
_ <- adjust(12.minutes)
l <- q.take.zip(q.take) // NOT `takeAll`, else you don't force synchronisation two times!
} yield l.productIterator.toList
}.provideLayer(testEnvironment)