The Fairest Kotlin Coroutines Operator

February 14, 2021 | 3 min read
  • #  kotlin
  • #  coroutines
  • Puzzle

    What do you think, do the test below pass or fail?

    @Test
    fun `combine passes or fails`() = runBlockingTest {
        val letterFlow = MutableSharedFlow<String>()
        val digitFlow = MutableSharedFlow<Int>()
    
        var actual = ""
        val job = launch {
            combine(letterFlow, digitFlow) { letter, digit -> letter + digit }
                .collect { actual += "$it " }
        }
    
        letterFlow.emit("A")
        digitFlow.emit(1)
        digitFlow.emit(2)
        letterFlow.emit("B")
        advanceUntilIdle()
    
        assertEquals("A1 A2 B2 ", actual)
    
        job.cancelAndJoin()
    }
    

    Recently I stumbled upon this interesting issue. I was sure that the test must pass, but it failed with the actual execution result A1 B1 . Whaaat?! 😮

    Investigation

    Subsequent experiments showed that the same issue was reproducible with a MutableStateFlow, which in some cases could be solved by applying the buffer() operator onto the letter and digit flows, before combining them.

    In contrast, when I used cold flows instead of the hot flows, the test always passed just fine.

    Another interesting observation was that the code in the test did work well in a real app. It only failed in tests. That puzzled me more and more.

    The suspect under investigation was the combine operator. Couple of hours and cups of tea later, the combine operator was nominated to be the fairest coroutine operator of them all. The reason for that was the yield() call inside the FlowCollector<R>.combineInternal() method of the coroutines implementation. The comment next it says that combine yields here to emulate fairness.

    for (i in 0 until size) {
        launch {
            flows[i].collect { value ->
                resultChannel.send(Update(i, value))
                yield() // Emulate fairness, giving each flow chance to emit
            }
        }
    }
    

    Exactly this fairness of combine operator was the reason for its weird behavior in the test, and here is why.

    The test runs in a single-threaded environment. Kotlin coroutines support concurrency also while running on a single thread. For this, combine operator calls yield() inside its collecting for-loop to give other suspended coroutines a chance to resume and perform their job. Emitting multiple values into the hot shared flows one after another seemed to misuse that fairness and block internal event-loop, preventing combine from running on each value emission. This caused the combine operator to skip values and misbehave.

    Solution

    After it became clear that the excessive fairness of the combine operator was causing the issue, the solution was fairly easy to find. I just had to force emit operator to emulate fairness in return by yielding after emitting a value, similar to how combine operator does it. This is how fairEmit operator was born.

    suspend fun <T> MutableSharedFlow<T>.fairEmit(value: T) {
        emit(value)
        yield()
    }
    

    The final replacement of the emit operator with the new fairEmit one made the test pass just fine.

    Conclusion

    Despite solving the issue, I wouldn’t be that confident in running coroutines code in a single-threaded environment without prior testing. Who knows what other surprises Kotlin coroutines yield.

    I hope that the simple trick described above saved your some time and energy, and you enjoyed it. Have fun and happy coding! ✌️