Testing background process with ZIO

ZIO is an awesome library to deal with the complicated part of effects, most notably concurrency and asynchronicity.

Sometimes, you want to create a “batch” like process, i.e. a fiber running in background and never terminating, which does some effect based on a trigger, like a period of time or reception of an external event. And of course you want to test its semantic. Fortunately, ZIO provides a test environment with a test clock that you can adjust for your needs. Here, it becomes apparent why the clock effect is explicit in ZIO: you can control it!

Note for the whole example, imports are:
import zio._
import zio.clock._
import zio.duration._
import zio.test.environment._
import java.util.concurrent.TimeUnit 
The whole code is available here:
So, let’s build a trivial Batch system:
object Batch {
 /*
  * A batch is a forever looping running process (`forever`) forked in background
  * and properly detached from its parent fiber (`forkDaemon`) that execute an
  * effect each time a trigger is yielded.
  */
 def createBatch(effect: ZIO[Clock, Nothing, Unit]): URIO[Clock, Fiber[Nothing, Nothing]] = {
   // let's say that in our case, the trigger is just a "some time passed"
   val trigger = UIO.unit.delay(5.minutes)
   // the whole batch
   (trigger *> effect).forever.forkDaemon
 }
} 
So now, we want to test it. For that, we are going to use ZIO test environment:
ZIO.accessM[Clock with TestClock] { testClock =>
 def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
  // now use testClock or adjust
}.provideLayer(testEnvironment) 
Often, when you want to test effects that normally spread upon some time, you build a record of what happens. A record is a mutable effect, so you learn to put it in a Ref:
/*
* Create a batch whose effect it to store current time in a `Ref` and write something out
*/
def demoBatch(r: Ref[List[Long]]) = Batch.createBatch(
 // access what is needed from environment
 ZIO.accessM[Clock]{ c => c.get.currentTime(TimeUnit.MINUTES).flatMap(t =>
   // actual effect
   UIO.effectTotal(println(s"done at $t")) *> r.update(t :: _)
 ) }
) 

OK, so we have a batch that will record its triggering time in a Ref at each of its executions

Perfect, let’s test that by adjusting the test clock after two iterations:
1. val prog1 = ZIO.accessM[Clock with TestClock] { testClock =>
2.  def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
3.  for {
4.    r <- Ref.make(List.empty[Long])
5.    _ <- demoBatch(r)
6.    _ <- adjust(12.minutes)
7.    l <- r.get
8.  } yield l
9. }.provideLayer(testEnvironment) 

Great! In a couple of lines, we are ready to test our batch by injecting a test clock and adjusting time to what we need.

The only remaining part is to run the program and check for recorded values:
def main(args: Array[String]): Unit = {
 val l = Runtime.default.unsafeRun(prog1)
 assert(l == 10 :: 5 :: Nil, s"Got unexpected execution list: ${l}")
} 
Run it, and… TADA!
done at 5
done at 10
Exception in thread "main" java.lang.AssertionError: assertion failed: Got unexpected list: List()
    at scala.Predef$.assert(Predef.scala:282)
    at MainTest1$.main(MainClock.scala:55)
    at MainTest1.main(MainClock.scala)
    ...

Process finished with exit code 1 

You got an error.
So, what’s going on? We see both prints, but the list is empty.

Actually, what happens is a race condition. Because concurrency is hard, and that’s why you let that to others.

The race condition is between the time it takes for adjust to tell fibers « hey, you need to do what you should when time is passing » and our batch fiber to actually do what it should on the one hand, and the r.get on the other hand. r.get is really fast. More than fiber business.

So you can unset the race condition with something like that:
val prog2 = ZIO.accessM[Clock with TestClock] { testClock =>
 def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
 for {
   r <- Ref.make(List.empty[Long])
   _ <- demoBatch(r)
   _ <- adjust(12.minutes)
   _ <- UIO.effectTotal(Thread.sleep(5000))
   l <- r.get
 } yield l
}.provideLayer(testEnvironment) 
This time, if you run prog2, you get the expected result:
done at 5
done at 10

Process finished with exit code 0 

Success! Really, success?
No, because you introduced non determinism in your test. It may happen that sometime, due to execution constraints like load, background fiber takes more time to adjust. Or, like in that case, you massively overestimated the sleep duration and so your tests will take much more time than needed, most of that time spent sleeping.

So, what’s the correct solution? As always with concurrency problems, the correct answer is to encode your protocol by forcing a synchronisation point for each execution of your batch. This is tedious, since it means that you will need to clearly count each of them, but it’s the only way toward determinism.
To force synchronization, the simplest way is to use a Queue and call one take for each offer.

So, for the batch, you will have:
def demoBatch(q: Queue[Long]) = Batch.createBatch(
 // access what is needed from environment
 ZIO.accessM[Clock]{ c => c.get.currentTime(TimeUnit.MINUTES).flatMap(t =>
   // actual effect
   UIO.effectTotal(println(s"done at $t")) *> q.offer(t).unit
 ) }
) 
And your program becomes:
val prog3 = ZIO.accessM[Clock with TestClock] { testClock =>
 def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
 for {
   q <- Queue.unbounded[Long]
   _ <- demoBatch(q)
   _ <- adjust(12.minutes)
   l <- q.take.zip(q.take) // NOT `takeAll`, else you don't force synchronisation two times!
 } yield l.productIterator.toList
}.provideLayer(testEnvironment) 
That time, you have a deterministic test that runs in the minimum possible time. And remember: concurrency is hard, and in the same spirit as date or crypto algorithms, just don’t try to build your concurrency primitives by yourself: use a lib that has the corresponding batteries.
Francois ARMAND

Partager ce post

Partager sur facebook
Partager sur twitter
Partager sur linkedin
Partager sur print
Partager sur email
Retour haut de page